Python 数据恢复实战:U盘CHK文件及误删文件恢复深度探索
引言:数据丢失的梦魇与Python的援手
在数字时代,数据已成为我们生活和工作中不可或缺的一部分。无论是珍贵的家庭照片、重要的工作文档,还是多年的研究成果,它们的意外丢失都可能带来无法估量的损失和困扰。U盘(USB闪存驱动器)作为一种便捷的存储介质,因其便携性而被广泛使用,但也常常成为数据丢失的“重灾区”。不当的插拔、病毒攻击、文件系统损坏等原因都可能导致U盘中的数据无法访问,甚至出现大量的 FILExxxx.CHK 文件,让用户束手无策。此外,误删除文件也是一个常见的数据丢失场景。
当Windows的磁盘检查工具(CHKDSK)在修复文件系统错误时,它可能会将找到的但无法归属到正确文件或目录的数据片段保存为 .CHK 文件,存放在 FOUND.xxx 文件夹中。这些 CHK 文件本身包含了原始数据的残片,但失去了文件名和目录结构,使得直接识别和使用它们变得异常困难。同样,当文件被“删除”时,操作系统通常只是在文件系统中做了一些标记(例如,在FAT文件系统中将目录条目的首字节改为0xE5,在NTFS中标记MFT条目为未使用),而实际的数据块在被新数据覆盖之前仍然存在于磁盘上。
传统的商业数据恢复软件虽然功能强大,但往往价格不菲,且其内部工作原理对用户来说是一个黑箱。而Python作为一种功能强大、语法简洁且拥有丰富库支持的编程语言,为我们提供了一个 уникальный视角和工具集,使我们能够深入理解数据恢复的底层机制,并亲手编写脚本来尝试恢复丢失的数据。通过Python,我们可以直接与磁盘的原始字节流交互,解析文件系统结构,识别文件签名,甚至尝试重建损坏的文件片段。
第一部分:数据存储与文件系统核心原理
在尝试恢复任何数据之前,我们必须对数据是如何被存储以及如何被管理的有一个清晰且深入的理解。这包括了物理存储介质的基本工作方式,以及操作系统用于组织和访问这些数据的逻辑结构——即文件系统。
1.1 物理存储介质基础回顾
尽管我们主要已关注U盘,但其底层存储原理与其他基于闪存的设备(如SSD)以及传统的机械硬盘(HDD)有共通之处,也有其特性。
机械硬盘 (HDD – Hard Disk Drive):
结构: 由一个或多个高速旋转的磁盘片(Platters)组成,数据通过磁头(Heads)在盘片表面的磁性介质上进行读写。盘片被划分为同心圆的磁道(Tracks),每个磁道又被划分为扇区(Sectors)。扇区是磁盘读写的最小物理单位,通常大小为512字节或4KB(高级格式化硬盘)。
寻址: 通过CHS(Cylinder-Head-Sector,柱面-磁头-扇区)或更现代的LBA(Logical Block Addressing,逻辑块寻址)方式定位数据。LBA将磁盘视为一个线性的扇区序列。
数据读写: 磁头在盘片上移动到指定磁道,等待目标扇区旋转到磁头下方,然后进行读写。这个过程涉及到机械运动,因此HDD的随机访问速度相对较慢。
数据删除: 当文件被删除时,HDD本身通常不会立即擦除数据。操作系统仅修改文件系统中的元数据,标记相关扇区为可用。实际数据会保留在盘片上,直到新的数据写入并覆盖它们。
固态驱动器 (SSD – Solid State Drive) 与 U盘 (USB Flash Drive):
结构: 基于闪存(Flash Memory)技术,通常是NAND闪存。闪存没有机械运动部件,数据存储在浮栅晶体管(Floating Gate Transistors)中。
组织: 闪存被组织成块(Blocks),每个块又包含多个页(Pages)。页是闪存读取和编程(写入)的最小单位(例如4KB, 8KB, 16KB),而块是擦除的最小单位(例如128KB, 256KB, 512KB甚至更大)。
特性:
读写不对称: 读取页相对较快。写入页之前,如果该页已有数据,则不能直接覆盖,必须先擦除其所在的整个块。擦除操作相对耗时。
写入放大 (Write Amplification): 由于擦除单位是块,即使只修改页中的一小部分数据,也可能需要读取整个块的内容,修改后,擦除原块,再将更新后的整个块写回新的位置。这导致实际写入闪存的数据量大于用户请求写入的数据量。
损耗均衡 (Wear Leveling): 闪存单元的擦写次数有限(P/E Cycles)。为了延长SSD/U盘寿命,控制器会使用损耗均衡算法,将写入操作均匀分布到所有闪存块上,避免某些块过早损坏。
垃圾回收 (Garbage Collection): 后台进程,用于整理闪存空间,将有效数据从包含无效(已删除)数据的块中迁移到新的块,然后擦除旧块以备重用。
TRIM命令: 操作系统可以通知SSD/U盘哪些数据块不再使用(例如,文件被删除后)。SSD控制器接收到TRIM命令后,可以在垃圾回收过程中主动擦除这些数据块,以提高后续写入性能并辅助损耗均衡。这对数据恢复来说是个巨大的挑战,因为TRIM可能导致已删除数据被物理擦除。 U盘对TRIM的支持情况不一,且操作系统和U盘主控都需要支持。
1.2 文件系统的概念与作用
文件系统是操作系统用于明确存储设备(或分区)上的文件的方法和数据结构;即在存储设备上组织文件的方法。它使得用户能够以文件名和目录(文件夹)的层次结构来创建、访问、修改和删除数据,而无需关心数据在物理介质上的具体存储位置和细节。
文件系统的主要功能包括:
空间管理: 跟踪哪些存储空间已被分配,哪些是空闲的。
命名: 为文件和目录提供人类可读的名称。
目录结构: 组织文件和目录的层次关系。
元数据管理: 存储关于文件的信息,如文件名、大小、创建/修改时间、权限、文件在磁盘上的物理位置等。
API提供: 向应用程序提供访问文件系统功能的接口(如打开、读取、写入、关闭文件等)。
1.3 常见文件系统详解 (重点已关注U盘常用类型)
我们将重点已关注在U盘上常见的文件系统:FAT32、exFAT,并简要提及NTFS,因为NTFS也可能出现在大容量U盘或移动硬盘上。
1.3.1 FAT (File Allocation Table) 文件系统家族
FAT是最早也是最简单的文件系统之一,因其良好的兼容性,在U盘、存储卡等移动存储设备上仍被广泛使用。主要有FAT12, FAT16, FAT32和exFAT等版本。
核心组件:
引导扇区 (Boot Sector) / DBR (DOS Boot Record):
位于分区的第一个扇区(逻辑扇区0)。
包含文件系统的基本信息和参数,如每个扇区的字节数、每簇的扇区数、FAT表的数量、FAT表的大小、根目录的位置(FAT12/16)、总扇区数、文件系统类型标识等。
还包含一小段启动代码(引导程序),用于从该分区启动操作系统(虽然在U盘数据恢复场景中我们更关心其参数信息)。
对于数据恢复,正确解析引导扇区是理解文件系统布局的第一步。
文件分配表 (FAT – File Allocation Table):
FAT是FAT文件系统的核心,它本质上是一个大数组,数组中的每个条目对应存储区域中的一个簇(Cluster)。簇是文件系统分配磁盘空间的最小单位,由一个或多个连续的扇区组成。
FAT条目的含义:
0x0000 (FAT12/16) or 0x00000000 (FAT32): 表示该簇是空闲的,可用于存储新数据。
0xFFF7 (FAT12/16) or 0x0FFFFFF7 (FAT32): 表示该簇是一个坏簇,不应使用。
0xFFF8 – 0xFFFF (FAT12/16) or 0x0FFFFFF8 – 0x0FFFFFFF (FAT32): 表示该簇是文件中最后一个簇(EOF – End Of File)。
其他值: 表示文件中下一个簇的簇号。通过这个值,可以像链表一样将属于同一个文件的多个簇链接起来,形成文件分配链。
FAT表的副本: 通常会有两个(或更多)FAT表的副本,存放在引导扇区之后。这是为了数据冗余,如果主FAT表损坏,可以使用副本进行恢复。在实际操作中,操作系统通常只更新主FAT表,副本可能不是最新的。
根目录区 (Root Directory Area):
FAT12/FAT16: 根目录区的大小和位置是固定的,紧随最后一个FAT表之后。它包含固定数量的目录条目。由于大小固定,根目录下的文件和子目录数量有限。
FAT32: 根目录区不再是固定大小和位置的特殊区域,而是像普通子目录一样,由一个或多个簇组成,其起始簇号记录在引导扇区中。这使得FAT32的根目录可以存储更多的条目,并且可以扩展。
数据区 (Data Area):
位于根目录区(FAT12/16)或所有FAT表之后(FAT32的根目录也在数据区)。
这部分区域被划分为一个个的簇,用于存储实际的文件数据和子目录数据。
每个簇都有一个唯一的簇号(从2开始,簇0和簇1是保留的,不用于数据存储)。
目录条目 (Directory Entry):
无论是根目录还是子目录,它们的内容都是一系列32字节的目录条目。
短文件名目录条目 (SFN – Short File Name):
文件名 (Bytes 0-7): 8个字节的文件名,不足用空格 (0x20) 填充。
扩展名 (Bytes 8-10): 3个字节的扩展名,不足用空格填充。
属性 (Byte 11): 文件的属性字节,例如:
0x01: 只读 (Read-only)
0x02: 隐藏 (Hidden)
0x04: 系统 (System)
0x08: 卷标 (Volume Label) – 特殊条目,表示分区名
0x10: 子目录 (Subdirectory)
0x20: 存档 (Archive)
保留 (Bytes 12-21): 通常为0。NT系统可能用一部分存储创建时间和最后访问时间的低位。
最后写入时间 (Bytes 22-23): 编码格式。
最后写入日期 (Bytes 24-25): 编码格式。
起始簇号 (Bytes 26-27 for FAT12/16, higher 2 bytes at 20-21 for FAT32): 文件或子目录数据占用的第一个簇的簇号。对于FAT32,是低16位在26-27,高16位在20-21。这是连接到FAT表找到文件内容的关键。
文件大小 (Bytes 28-31): 文件内容的字节大小。对于子目录,此字段为0。
长文件名目录条目 (LFN – Long File Name):
为了支持超过8.3格式的长文件名和Unicode字符,FAT引入了LFN机制。
LFN条目是一种特殊的目录条目,其属性字节为 0x0F (只读 | 隐藏 | 系统 | 卷标)。
一个长文件名可能由多个LFN条目组成,每个LFN条目存储长文件名的一部分 (最多13个Unicode字符)。
LFN条目以逆序存储在实际的SFN条目之前。最后一个LFN条目(最靠近SFN条目的那个)的序号字段最高位被置1。
每个LFN条目都有一个校验和,基于其关联的SFN条目的短文件名计算,用于验证LFN和SFN的配对关系。
不支持LFN的旧系统会忽略这些属性为 0x0F 的条目,只读取SFN条目。
文件存储与分配:
当创建一个新文件时,操作系统会:
在父目录中查找或创建一个新的目录条目,填入文件名、属性、当前时间等。
在FAT表中查找足够的空闲簇来存储文件数据。
将找到的第一个空闲簇的簇号写入目录条目的起始簇号字段。
在FAT表中,将这个簇对应的条目更新为下一个分配给该文件的簇号(如果文件需要多个簇),或者标记为EOF(如果这是最后一个簇)。
将文件数据写入分配的簇中。
文件数据在数据区可能不是连续存储的,即文件可能是碎片化 (Fragmented) 的。FAT表负责将这些不连续的簇链接起来。
文件删除 (FAT):
当一个文件被删除时,操作系统通常执行以下操作:
修改目录条目: 将该文件对应的(SFN)目录条目的第一个字节改为 0xE5(一个特殊的标记,表示已删除)。对于LFN条目,它们的首字节也会被标记为 0xE5,但它们的校验和仍然有效。
清空FAT链 (通常): 将该文件在FAT表中所占用的所有簇对应的条目都清零(标记为未使用/空闲)。这意味着文件簇之间的链接关系丢失了。
数据区不变: 实际存储文件数据的簇内容不会被立即擦除。它们只是被标记为可用,等待被新的文件数据覆盖。
恢复可能性:
如果被删除文件的簇还没有被新数据覆盖,并且能够找到其原始的目录条目(即使首字节是0xE5),我们就能知道它的起始簇号和文件大小。
最大的挑战在于重建FAT链,因为这个链信息通常在删除时被清除了。
对于非碎片化的文件(即文件数据存储在连续的簇中),恢复相对容易,只需根据起始簇号和文件大小连续读取数据即可。
对于碎片化的文件,如果FAT链信息丢失,完美恢复非常困难。可能需要依赖文件内容本身的结构(例如,JPEG文件中的连续MCU块)或进行高级的数据雕刻。
FAT32的特点:
使用32位FAT条目(实际上只用了低28位表示簇号),支持更大的分区容量(理论上可达2TB,但Windows通常限制为32GB进行格式化)。
每个簇的大小可以更小,从而减少小文件占用的磁盘空间浪费(与FAT16相比)。
根目录是可扩展的。
广泛用于U盘和存储卡。
exFAT (Extended File Allocation Table):
由微软推出,旨在替代FAT32,特别适用于大容量闪存设备。
主要改进:
理论上支持极大的文件大小和分区容量(远超FAT32)。
改进了空闲空间分配的性能,使用位图(Allocation Bitmap)来跟踪簇的使用情况,类似于NTFS,可以更快地找到空闲簇。
引入了TexFAT(Transaction-safe FAT),用于提高文件操作的可靠性(可选特性)。
目录条目结构有所不同,更灵活。
支持访问控制列表 (ACLs),但实际应用不多。
恢复角度: exFAT的删除操作与FAT32类似,也会标记目录条目和清除(或标记为未使用)文件分配信息。恢复的挑战和策略与FAT32有共通之处,但需要理解其独特的元数据结构,如位图。
1.3.2 NTFS (New Technology File System)
NTFS是现代Windows操作系统(如Windows NT, 2000, XP, Vista, 7, 8, 10, 11)默认的文件系统。它比FAT复杂得多,提供了许多高级特性,如日志记录、安全性(ACLs)、压缩、加密、硬链接、稀疏文件等。大容量U盘或移动硬盘也可能格式化为NTFS。
核心组件:
引导扇区 (Boot Sector) / VBR (Volume Boot Record):
位于分区的第一个扇区。
包含NTFS版本信息、每个扇区的字节数、每簇的扇区数、MFT(主文件表)的起始逻辑簇号(LCN)、MFT镜像的起始簇号等关键参数。
同样包含引导代码。
主文件表 (MFT – Master File Table):
NTFS的核心。在NTFS中,一切皆文件,包括MFT本身也是一个文件(名为$MFT)。
MFT由一系列记录(MFT Records或File Records)组成,每个记录通常大小为1KB。
文件系统中的每个文件和目录(包括元数据文件自身)都在MFT中至少有一个记录。
MFT记录结构:
记录头 (Record Header): 包含记录的魔数(如FILE或BAAD表示坏记录)、更新序列号(用于保证写入一致性)、此记录的MFT条目号、硬链接计数、序列号(每次重用MFT记录时递增)等。
属性 (Attributes): MFT记录的主体由一系列属性组成。每个属性都有一个类型代码、一个可选的名称,以及属性值(数据)。属性可以是常驻的 (Resident) 或非常驻的 (Non-resident)。
常驻属性: 如果属性数据很小,可以直接存储在MFT记录内部。
非常驻属性: 如果属性数据较大,MFT记录中只存储指向数据区中实际数据块(称为数据运行 Data Runs)的指针。数据运行描述了数据在磁盘上的起始簇号和连续簇的数量。一个非常驻属性可能由多个数据运行组成(表示文件是碎片化的)。
重要的标准属性类型:
$STANDARD_INFORMATION (0x10): 包含文件的基本信息,如创建时间、修改时间、最后访问时间、文件属性(只读、隐藏、系统、存档等)、所有者ID、安全ID。
$ATTRIBUTE_LIST (0x20): (可选) 如果一个文件的所有属性无法容纳在一个MFT记录中,此属性会列出其他包含该文件属性的MFT记录。
$FILE_NAME (0x30): 包含文件名(Unicode)、父目录的MFT引用、文件的时间戳(可能与$STANDARD_INFORMATION中的不同步)、文件分配大小、实际大小、文件属性等。一个文件可以有多个$FILE_NAME属性(例如,一个长文件名和一个兼容DOS的短文件名,或者硬链接)。
$VOLUME_NAME (0x60): (用于$Volume文件) 卷标名。
$DATA (0x80): 包含文件的实际内容。这是最重要的属性。它可以是常驻的(对于非常小的文件)或非常驻的。
$INDEX_ROOT (0x90) 和 $INDEX_ALLOCATION (0xA0): 用于实现目录(B+树索引)。目录的内容(即其包含的文件和子目录的列表)被组织成索引。小目录的索引可能完全常驻在$INDEX_ROOT中;大目录则使用$INDEX_ALLOCATION指向数据区中的索引块。
$BITMAP (0xB0): (用于$Bitmap文件和目录的索引) 位图,用于跟踪簇的分配情况或索引条目的使用情况。NTFS的$Bitmap文件(MFT记录号通常是6)就使用此属性来记录卷上所有簇的分配状态(已用或空闲)。
元数据文件: NTFS文件系统本身也由一系列以$开头的特殊文件(元数据文件)组成,它们都有自己的MFT记录。例如:
$MFT (记录0): 主文件表本身。
$MftMirr (记录1): MFT的前几个(通常是4个)记录的副本,用于灾难恢复。
$LogFile (记录2): 日志文件,用于记录文件系统操作,保证文件系统的一致性(类似于事务日志)。
$Volume (记录3): 包含卷名、NTFS版本和卷状态等信息。
$AttrDef (记录4): 定义了卷上所有允许的属性类型及其特性。
$Bitmap (记录6): 卷簇分配位图。
$Boot (记录7): 引导扇区(VBR)。
$BadClus (记录8): 坏簇表。
$Secure (记录9): 安全描述符数据库。
$UpCase (记录10): Unicode字符大写转换表。
$Extend (目录,记录11): 包含如$Quota, $ObjId, $Reparse等可选的扩展文件系统功能文件。
文件存储与分配 (NTFS):
当创建文件时,NTFS会分配一个新的MFT记录(或重用一个标记为未使用的记录)。
文件的属性(如$STANDARD_INFORMATION, $FILE_NAME, $DATA)被添加到MFT记录中。
如果$DATA属性是非常驻的,NTFS会查询$Bitmap文件找到空闲的簇,并将这些簇分配给文件,更新$Bitmap,然后在$DATA属性中记录数据运行信息。
目录是通过$INDEX_ROOT和$INDEX_ALLOCATION属性实现的B+树。当向目录添加文件时,会更新这个索引。
文件删除 (NTFS):
当一个文件被删除时(例如,移到回收站再清空,或Shift+Delete):
MFT记录标记: 该文件在MFT中的记录会被标记为“未使用”(通常是记录头中的一个标志位被设置)。然而,记录的内容(包括所有属性,如文件名、时间戳、数据运行指针等)通常不会立即被清除。
** B i t m a p 更新 ∗ ∗ : 该文件所占用的簇在 ‘ Bitmap更新**: 该文件所占用的簇在` Bitmap更新∗∗:该文件所占用的簇在‘Bitmap`文件中会被标记为“空闲”。
目录索引更新: 该文件在父目录索引中的条目会被移除或标记为无效。
文件名: $FILE_NAME属性中的文件名通常保持不变,但其在目录索引中的链接断开了。
数据区不变: 类似于FAT,实际存储文件数据的簇内容不会被立即擦除,直到它们被新文件覆盖。
恢复可能性 (NTFS):
比FAT高很多: 由于MFT记录在删除时通常保留了大部分元数据(包括指向数据簇的指针),如果这些MFT记录本身没有被新的文件记录覆盖,并且数据簇也没有被覆盖,那么恢复已删除文件的机会就很大。
关键在于找到并解析MFT记录:
可以扫描整个MFT(或其可能存在的区域),查找被标记为“未使用”但看起来仍然包含有效文件信息的记录。
$FILE_NAME属性可以提供原始文件名和路径信息。
$DATA属性的数据运行信息是定位实际文件内容的关键。
碎片化文件: NTFS的MFT记录直接存储了数据运行,所以即使文件是碎片化的,只要MFT记录完好且数据簇未被覆盖,也能准确恢复。
MFT记录被覆盖: 如果一个已删除文件的MFT记录被新文件重用了,那么恢复该文件将变得非常困难,可能只能依赖于数据雕刻。
1.4 数据恢复的基本原则
无论使用何种文件系统,数据恢复都遵循一些基本原则:
立即停止使用: 一旦发现数据丢失或误删除,应立即停止对该存储设备(U盘、硬盘等)的任何写入操作。继续使用会增加原始数据被新数据覆盖的风险,从而降低恢复成功的概率。如果是系统盘,最好关闭计算机并从另一系统或恢复盘启动。
物理损坏优先处理: 如果数据丢失是由于物理损坏(例如,U盘无法识别、有异响、电路板损坏),软件恢复通常无能为力。此时需要专业的硬件级数据恢复服务。本文主要讨论逻辑层面的数据恢复。
镜像优先 (Write Blocker): 在进行任何恢复尝试之前,如果条件允许,最佳做法是创建一个原始存储设备(或分区)的完整逐扇区镜像(Image)到一个健康的目标磁盘上。然后对镜像文件进行恢复操作。这样可以避免在原始设备上操作引入新的风险,并且可以多次尝试不同的恢复方法。在专业的数字取证中,会使用硬件或软件写保护器(Write Blocker)来确保在创建镜像或分析原始设备时不会对其进行任何写入。
不要恢复到原始设备: 将恢复出来的文件保存到另一个独立的存储设备上,绝不能保存回正在进行数据恢复的原始设备,因为这会覆盖其他可能需要恢复的数据。
耐心和细致: 数据恢复可能是一个耗时且复杂的过程,需要耐心和对细节的已关注。
没有绝对的保证: 即使遵循了所有最佳实践,数据恢复也不能保证100%成功。成功率取决于多种因素,如数据丢失的原因、文件系统类型、数据被覆盖的程度、设备类型(SSD的TRIM)等。
理解了这些基础知识,我们就能更有针对性地去分析CHK文件是如何产生的,以及如何尝试恢复它们和其它已删除的文件。
第二部分:CHKDSK与CHK文件探秘
当Windows用户遇到文件系统错误,例如U盘无法正常读取、提示需要格式化、或者文件和目录出现异常时,一个常用的工具就是 CHKDSK (Check Disk)。这个命令行工具会扫描磁盘分区的文件系统结构,查找并尝试修复错误。在这个修复过程中,CHKDSK 可能会创建 FOUND.xxx 文件夹,并在其中生成一系列名为 FILExxxx.CHK 的文件。
2.1 CHKDSK (Check Disk) 工具概述
CHKDSK 是Windows操作系统内置的一个磁盘检查和修复工具。它可以用于检查FAT (FAT12, FAT16, FAT32), exFAT, 和NTFS文件系统的完整性。
启动方式:
命令行: chkdsk <驱动器盘符>: [参数] 例如 chkdsk E: /f
图形界面: 在“我的电脑”或“此电脑”中,右键点击目标驱动器 -> “属性” -> “工具”选项卡 -> “检查”。
主要参数:
<驱动器盘符>:: 指定要检查的驱动器,例如 E:。
/f: 修复磁盘上的错误。如果省略此参数,CHKDSK 只会报告错误,不进行修复。如果驱动器正在被使用(例如系统盘或包含打开的文件),/f 参数可能需要重启计算机才能执行。
/r: 定位坏扇区并恢复可读信息。此参数包含了 /f 的功能。扫描坏扇区是一个非常耗时的过程。
/x: (与 /f 一起使用时) 强制卸载卷(如果需要)。
/v: (在FAT/FAT32上) 显示磁盘上每个文件的完整路径和名称。(在NTFS上) 显示清除消息(如果有)。
/scan: (NTFS专用) 运行联机扫描,不需要卸载卷。
/spotfix: (NTFS专用) 运行联机点修复,同样不需要卸载卷。
CHKDSK 的工作阶段 (以NTFS为例,FAT类似但简化):
CHKDSK 在修复模式下通常会经历多个阶段:
阶段1:检查基本文件系统结构: 验证核心元数据文件(如NTFS的$MFT, $Bitmap等)的一致性。
阶段2:检查文件名链接: 验证目录结构和文件名的有效性,确保每个文件都能在目录索引中正确找到。
阶段3:检查安全描述符: 验证与文件和目录关联的安全信息(权限等)。
阶段4:查找坏簇 (如果使用了 /r 参数): 扫描用户文件数据中的坏簇。
阶段5:检查空闲空间 (如果使用了 /r 参数): 验证未分配簇的列表是否准确。
2.2 为何会产生CHK文件?
当CHKDSK在扫描和修复过程中发现文件系统存在以下类型的错误时,可能会生成CHK文件:
丢失的分配单元 (Lost Allocation Units / Lost Clusters):
这是产生CHK文件最常见的原因。
当FAT表或NTFS的$Bitmap中标记某些簇为“已使用”,但在任何目录条目或MFT记录中都找不到对这些簇的引用时,这些簇就被认为是“丢失的”。这意味着数据存在,但文件系统不知道它们属于哪个文件。
CHKDSK 无法确定这些丢失簇的原始文件名或它们在文件中的逻辑顺序(如果是多个不连续的丢失簇)。
为了不直接丢弃这些可能包含有价值数据的数据块,CHKDSK 会将每个找到的连续的丢失簇序列(或单个丢失簇)恢复为一个单独的 .CHK 文件。
交叉链接的文件 (Cross-linked Files):
当两个或多个文件(或目录)的文件分配信息(FAT链或NTFS数据运行)错误地指向了同一个或同一组簇时,就发生了交叉链接。这意味着文件系统认为这部分数据同时属于多个文件,这显然是不正确的。
CHKDSK 可能会尝试解决交叉链接。一种做法是复制共享的簇,为其中一个文件创建新的副本,然后将另一个文件指向原始簇。或者,它可能会将共享的簇数据保存为一个 .CHK 文件,并修复两个文件的分配信息,使其不再共享这些簇(这可能导致其中一个或两个文件数据不完整)。
无效的目录条目或MFT记录:
目录条目(FAT)或MFT记录(NTFS)本身可能损坏,例如指向无效的簇号、文件大小与分配的簇不匹配、时间戳无效等。
如果CHKDSK发现一个目录条目或MFT记录引用了一些数据簇,但该条目/记录本身已损坏到无法完全重建其原始文件关联,它可能会将这些数据簇恢复为 .CHK 文件。
损坏的目录结构:
如果目录的索引(FAT的子目录簇链,NTFS的$INDEX_ROOT / $INDEX_ALLOCATION)损坏,CHKDSK 可能无法正确遍历目录树。
在修复过程中,如果找到一些无法归属到有效目录结构中的文件数据,也可能被保存为 .CHK 文件。
2.3 CHK文件的本质和存储位置
本质: FILExxxx.CHK 文件 不是 某种特殊格式的文件。它们是 CHKDSK 从磁盘上直接提取出来的 原始数据块。这些数据块可能是任何类型文件的片段,例如文本文件的一部分、JPEG图像的一部分、程序代码的一部分、数据库文件的一部分,甚至是文件系统元数据本身的片段。
每个 .CHK 文件通常对应于 CHKDSK 找到的一个或多个连续的簇。
它们失去了原始的文件名、扩展名、目录路径以及大部分元数据(如创建/修改时间,这些信息原本存储在目录条目或MFT记录中)。
文件的大小就是 CHKDSK 恢复的那些簇的总大小。
命名:
CHK文件通常被命名为 FILE0000.CHK, FILE0001.CHK, FILE0002.CHK,以此类推,序号递增。
存储位置:
CHKDSK 会在被检查驱动器的根目录下创建一个或多个名为 FOUND.xxx 的隐藏文件夹(例如 FOUND.000, FOUND.001 等),并将生成的 .CHK 文件存放在这些文件夹中。
如果多次运行 CHKDSK 并且每次都生成了CHK文件,可能会看到多个 FOUND.xxx 文件夹。
2.4 CHK文件恢复的挑战
直接使用这些 .CHK 文件通常是不可行的,因为:
未知文件类型: 你不知道 FILE0000.CHK 原本是一个文档、一张图片,还是一个可执行文件。双击打开它,系统可能会提示选择程序,或者用错误的程序打开导致乱码。
内容可能不完整: 一个 .CHK 文件可能只包含原始文件的一部分,特别是如果原始文件是碎片化的,而 CHKDSK 只找到了其中一个片段。
无序性: 如果一个大文件被分割成了多个 .CHK 文件,你无法知道这些 .CHK 文件的正确顺序来将它们拼接回原始文件。
数量庞大: 有时 CHKDSK 可能会生成成百上千个 .CHK 文件,手动检查它们是不现实的。
因此,恢复CHK文件的核心任务是 识别这些原始数据块的内容类型,并尝试将它们恢复到可用的状态。
2.5 为何不直接依赖 CHKDSK 的“修复”?
虽然 CHKDSK 的目标是修复文件系统使其恢复可用状态,但它的“修复”有时是以牺牲部分数据为代价的。
当 CHKDSK 将数据保存为 .CHK 文件时,它实际上已经承认无法将这些数据完美地放回原始的文件结构中。
对于交叉链接,CHKDSK 的修复可能会导致其中一个或多个文件内容损坏或不完整。
如果文件系统的关键元数据严重损坏,CHKDSK 可能无法找到所有的文件数据,或者错误地“修复”它们。
因此,如果数据非常重要,在运行 CHKDSK /f 或 CHKDSK /r 之前,强烈建议先尝试使用数据恢复软件(或我们即将学习的Python脚本)对原始状态的驱动器(或其镜像)进行数据提取。 如果已经运行了 CHKDSK 并生成了 .CHK 文件,那么这些 .CHK 文件就成了我们最后的希望之一。
理解了 CHKDSK 和 CHK 文件的这些背景知识后,我们就可以开始思考如何用Python来处理这些“孤儿”数据了。其核心思路是通过分析 CHK 文件自身的内容来推断其原始类型。
第三部分:Python与底层磁盘I/O
要用Python进行数据恢复,我们首先需要学习如何让Python程序直接访问和读取存储设备的原始数据,而不是通过操作系统提供的文件和目录这种高级抽象。这种底层的磁盘I/O能力是进行文件签名分析、数据雕刻以及文件系统结构解析的基础。
3.1 磁盘/分区的表示
在操作系统层面,物理磁盘和逻辑分区有其特定的表示方法。
Windows:
物理磁盘: 表示为 \.PhysicalDriveX,其中 X 是从0开始的磁盘编号 (例如 \.PhysicalDrive0, \.PhysicalDrive1)。访问物理磁盘通常需要管理员权限。
逻辑分区/卷: 表示为 \.C:,\.D: 等,其中 C: 或 D: 是分配给该分区的盘符。访问分区的原始数据同样需要管理员权限。直接打开盘符(如open('C:', 'rb'))通常只能访问文件系统层面的内容,而不是原始扇区。
注意: 在Python中,反斜杠 是转义字符,所以路径字符串需要写为 \\.\PhysicalDrive0 或 r'\.PhysicalDrive0'。
Linux:
物理磁盘: 通常表示为 /dev/sdX (对于SATA/SCSI/USB磁盘) 或 /dev/nvmeXnY (对于NVMe SSD),其中 X 是字母 (a, b, c, …),Y是数字。例如 /dev/sda, /dev/sdb。
逻辑分区: 在磁盘设备名后附加分区号,例如 /dev/sda1, /dev/sda2。
访问这些设备文件通常需要root权限,或者用户需要属于特定的组(如 disk 组)。
3.2 使用Python打开原始设备/分区
Python的内置 open() 函数可以用来打开这些设备文件,就像打开普通文件一样,但需要以二进制模式 ('rb') 打开进行读取。
核心函数: io.open() 或内置 open()
模式: 必须是 'rb' (二进制读取)。写入 ('wb', 'ab') 原始设备非常危险,可能导致数据完全丢失,除非你非常清楚自己在做什么(例如,写回恢复的数据到另一个磁盘的镜像文件)。
缓冲: 为了性能,可以指定缓冲策略,但对于逐扇区读取,通常使用默认缓冲或小的自定义缓冲区。buffering=0 (仅二进制模式) 会禁用缓冲,直接进行系统调用,但这可能不是最高效的。通常,让操作系统处理块设备的缓冲是合理的,或者使用 io.BufferedReader.
3.2.1 在Windows上打开设备
在Windows上,由于权限和设备命名的特殊性,直接使用 open() 可能不够,或者需要特定的API调用来获取设备句柄。更可靠的方式是使用 ctypes 模块调用Windows API函数 CreateFileW。
import ctypes # 导入ctypes模块,用于调用C库函数
import os # 导入os模块,提供操作系统相关功能
# 定义Windows API常量
GENERIC_READ = 0x80000000 # 定义通用读权限常量
GENERIC_WRITE = 0x40000000 # 定义通用写权限常量 (恢复时一般不用,除非写镜像)
FILE_SHARE_READ = 0x00000001 # 定义文件共享读权限常量
FILE_SHARE_WRITE = 0x00000002 # 定义文件共享写权限常量
OPEN_EXISTING = 3 # 定义打开已存在文件的常量
FILE_ATTRIBUTE_NORMAL = 0x80 # 定义普通文件属性常量
FILE_FLAG_NO_BUFFERING = 0x20000000 # 定义无缓冲标志 (读取时可能需要对齐)
FILE_FLAG_SEQUENTIAL_SCAN = 0x04000000 # 定义顺序扫描标志 (提示系统优化)
INVALID_HANDLE_VALUE = -1 # 定义无效句柄值的常量
# Kernel32函数原型定义
# CreateFileW 用于打开或创建文件或I/O设备
CreateFileW = ctypes.windll.kernel32.CreateFileW # 获取CreateFileW函数的引用
CreateFileW.argtypes = [ # 定义CreateFileW函数的参数类型
ctypes.c_wchar_p, # lpFileName: 文件名或设备路径 (宽字符字符串指针)
ctypes.c_uint32, # dwDesiredAccess: 访问权限 (读、写等)
ctypes.c_uint32, # dwShareMode: 共享模式
ctypes.c_void_p, # lpSecurityAttributes: 安全属性 (通常为None)
ctypes.c_uint32, # dwCreationDisposition: 创建方式
ctypes.c_uint32, # dwFlagsAndAttributes: 文件属性和标志
ctypes.c_void_p # hTemplateFile: 模板文件句柄 (通常为None)
]
CreateFileW.restype = ctypes.c_void_p # 定义CreateFileW函数的返回值类型 (句柄)
# ReadFile 用于从文件或I/O设备读取数据
ReadFile = ctypes.windll.kernel32.ReadFile # 获取ReadFile函数的引用
ReadFile.argtypes = [ # 定义ReadFile函数的参数类型
ctypes.c_void_p, # hFile: 文件句柄
ctypes.c_void_p, # lpBuffer: 读取缓冲区指针
ctypes.c_uint32, # nNumberOfBytesToRead: 要读取的字节数
ctypes.POINTER(ctypes.c_uint32), # lpNumberOfBytesRead: 实际读取的字节数 (指针)
ctypes.c_void_p # lpOverlapped: 异步操作结构体指针 (通常为None用于同步)
]
ReadFile.restype = ctypes.c_bool # 定义ReadFile函数的返回值类型 (布尔值,成功或失败)
# SetFilePointerEx 用于设置文件指针位置 (64位偏移)
SetFilePointerEx = ctypes.windll.kernel32.SetFilePointerEx # 获取SetFilePointerEx函数的引用
SetFilePointerEx.argtypes = [ # 定义SetFilePointerEx函数的参数类型
ctypes.c_void_p, # hFile: 文件句柄
ctypes.c_int64, # liDistanceToMove: 要移动的距离 (64位整数)
ctypes.POINTER(ctypes.c_int64), # lpNewFilePointer: 新的文件指针位置 (指针,可选)
ctypes.c_uint32 # dwMoveMethod: 移动方法 (0: 文件开头, 1: 当前位置, 2: 文件末尾)
]
SetFilePointerEx.restype = ctypes.c_bool # 定义SetFilePointerEx函数的返回值类型 (布尔值)
# CloseHandle 用于关闭一个打开的对象句柄
CloseHandle = ctypes.windll.kernel32.CloseHandle # 获取CloseHandle函数的引用
CloseHandle.argtypes = [ctypes.c_void_p] # 定义CloseHandle函数的参数类型 (句柄)
CloseHandle.restype = ctypes.c_bool # 定义CloseHandle函数的返回值类型 (布尔值)
# 尝试获取设备大小的IOCTL (可选,但有助于确定读取范围)
# IOCTL_DISK_GET_LENGTH_INFO = 0x0007405C # 定义获取磁盘长度信息的IOCTL码
# class GET_LENGTH_INFORMATION(ctypes.Structure): # 定义GET_LENGTH_INFORMATION结构体
# _fields_ = [("Length", ctypes.c_int64)] # 结构体字段:Length (64位整数)
# DeviceIoControl = ctypes.windll.kernel32.DeviceIoControl # 获取DeviceIoControl函数的引用
def open_windows_device(device_path): # 定义打开Windows设备的函数
"""
使用CreateFileW打开Windows上的物理驱动器或分区。
例如: r'\.PhysicalDrive0' 或 r'\.C:'
需要管理员权限。
返回设备句柄,如果失败则返回INVALID_HANDLE_VALUE。
"""
print(f"Attempting to open device: {
device_path} with CreateFileW") # 打印尝试打开设备的信息
handle = CreateFileW(
device_path, # 设备路径
GENERIC_READ, # 请求读权限
FILE_SHARE_READ | FILE_SHARE_WRITE, # 允许其他进程读写该设备 (重要,否则可能打开失败)
None, # 安全属性,通常为None
OPEN_EXISTING, # 只打开已存在的设备
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_SEQUENTIAL_SCAN, # 普通属性并提示顺序扫描
# FILE_FLAG_NO_BUFFERING, # 如果使用无缓冲,读取大小和偏移必须是扇区大小的倍数
None # 模板文件句柄,通常为None
)
if handle == INVALID_HANDLE_VALUE or handle is None: # 如果获取句柄失败
error_code = ctypes.GetLastError() # 获取最后一个错误代码
print(f"Failed to open device {
device_path}. Error code: {
error_code}") # 打印打开失败信息及错误码
# 可以根据error_code提供更具体的错误信息,例如5是权限不足
if error_code == 5: # 如果错误码是5 (拒绝访问)
print("Error 5: Access Denied. Please ensure you are running this script with Administrator privileges.") # 打印权限不足提示
elif error_code == 2: # 如果错误码是2 (找不到文件)
print(f"Error 2: The system cannot find the file specified ({
device_path}). Check the device path.") # 打印找不到文件提示
return INVALID_HANDLE_VALUE # 返回无效句柄
print(f"Successfully opened device {
device_path}. Handle: {
handle}") # 打印成功打开设备的信息
return handle # 返回设备句柄
def close_windows_device(handle): # 定义关闭Windows设备的函数
"""关闭通过CreateFileW打开的设备句柄。"""
if handle != INVALID_HANDLE_VALUE and handle is not None: # 如果句柄有效
print(f"Closing device handle: {
handle}") # 打印关闭句柄信息
closed = CloseHandle(handle) # 调用CloseHandle关闭句柄
if not closed: # 如果关闭失败
error_code = ctypes.GetLastError() # 获取错误码
print(f"Failed to close device handle {
handle}. Error code: {
error_code}") # 打印关闭失败信息
# else:
# print(f"Device handle {handle} closed successfully.") # 打印成功关闭信息 (可省略)
# 示例使用 (需要管理员权限运行此脚本):
# if __name__ == "__main__":
# # 重要: 请极端小心选择device_path_win,错误的操作可能损坏数据
# # 建议先使用U盘的盘符,例如 r'\.E:' (假设E盘是你的U盘)
# # 或者物理驱动器号,例如 r'\.PhysicalDrive1' (你需要先确定哪个是你U盘)
# # 不要轻易尝试操作你的系统盘 (如 PhysicalDrive0 或 C:)
# device_path_win = r'\.E:' # 将E替换为你的U盘盘符
# # device_path_win = r'\.PhysicalDriveX' # 将X替换为正确的物理驱动器编号
# print("Requesting Administrator privileges if not already granted...") # 打印请求管理员权限信息
# # 在实际脚本中,你可能需要更复杂的逻辑来检测和请求管理员权限
# # 或者直接提示用户以管理员身份运行
# h_device = open_windows_device(device_path_win) # 调用函数打开设备
# if h_device != INVALID_HANDLE_VALUE: # 如果设备成功打开
# print(f"Device {device_path_win} opened. Ready for I/O operations.") # 打印设备已打开信息
# # 在这里可以进行读取操作,例如读取第一个扇区
# # close_windows_device(h_device) # 操作完成后关闭设备
# else:
# print(f"Could not open device {device_path_win}.") # 打印无法打开设备信息
# print("Make sure the device path is correct and you have Administrator privileges.") # 提示检查路径和权限
关于 FILE_FLAG_NO_BUFFERING: 如果使用此标志,那么 ReadFile 的缓冲区地址、读取的字节数以及文件指针的偏移量都必须是磁盘扇区大小的倍数。这增加了编程的复杂性,但可以避免操作系统缓存带来的影响,在某些取证场景下可能需要。对于一般的数据恢复读取,使用操作系统的默认缓冲(不设置此标志)通常是可以接受且更简单的。FILE_FLAG_SEQUENTIAL_SCAN 是一个给操作系统的提示,如果打算顺序读取大量数据,它可能有助于优化性能。
3.2.2 在Linux上打开设备
在Linux上,过程相对简单,可以直接使用内置的 open() 函数,但同样需要适当的权限(通常是root权限,或用户属于disk组)。
import os # 导入os模块
def open_linux_device(device_path): # 定义打开Linux设备的函数
"""
打开Linux上的块设备文件,例如 /dev/sdb 或 /dev/sdb1。
需要root权限或用户在disk组中。
返回文件对象,如果失败则返回None。
"""
print(f"Attempting to open device: {
device_path} with os.open") # 打印尝试打开设备的信息
try:
# os.O_RDONLY: 以只读方式打开
# os.O_BINARY: 在某些系统上确保二进制模式 (虽然在Linux上对块设备影响不大)
# os.O_SYNC: (可选) 确保写入同步,但我们是只读。某些情况下用于确保直接IO。
# os.O_DIRECT: (可选) 尝试禁用内核缓冲,类似于FILE_FLAG_NO_BUFFERING。
# 使用O_DIRECT时,读取的偏移、大小和内存缓冲区地址通常需要对齐到扇区边界。
# 这会增加复杂性,我们暂时不使用它。
fd = os.open(device_path, os.O_RDONLY | getattr(os, 'O_BINARY', 0)) # 以只读二进制方式打开设备文件,获取文件描述符
print(f"Successfully opened device {
device_path}. File Descriptor: {
fd}") # 打印成功打开设备的信息
# 将文件描述符包装成Python文件对象,以便使用read, seek等方法
# buffering=0 表示不使用Python层面的缓冲,直接操作文件描述符,但系统仍有其缓冲
# 更常见的做法是让Python管理一些缓冲,例如使用默认的缓冲大小
device_file = os.fdopen(fd, 'rb') # 将文件描述符包装成文件对象,以二进制读取模式
return device_file # 返回文件对象
except FileNotFoundError: # 捕获文件未找到错误
print(f"Error: Device {
device_path} not found.") # 打印设备未找到信息
return None # 返回None
except PermissionError: # 捕获权限错误
print(f"Error: Permission denied to open {
device_path}. Try running as root or with sudo.") # 打印权限被拒绝信息
return None # 返回None
except Exception as e: # 捕获其他异常
print(f"An unexpected error occurred while opening {
device_path}: {
e}") # 打印意外错误信息
return None # 返回None
def close_linux_device(device_file): # 定义关闭Linux设备的函数
"""关闭通过os.fdopen打开的文件对象。"""
if device_file: # 如果文件对象存在
print(f"Closing device file: {
device_file.name}") # 打印关闭文件信息
try:
device_file.close() # 关闭文件对象
# print(f"Device file {device_file.name} closed.") # 打印成功关闭信息 (可省略)
except Exception as e: # 捕获异常
print(f"Error closing device file {
device_file.name}: {
e}") # 打印关闭文件错误信息
# 示例使用 (需要root权限或适当的用户组权限):
# if __name__ == "__main__":
# # 重要: 小心选择device_path_linux,确保它是你的目标U盘
# # 可以使用命令如 `lsblk` 或 `fdisk -l` 来确定正确的设备路径
# # 例如: /dev/sdb (整个U盘), /dev/sdb1 (U盘的第一个分区)
# device_path_linux = "/dev/sdXN" # 将XN替换为你的U盘分区,例如 /dev/sdb1
# if device_path_linux == "/dev/sdXN": # 如果路径是占位符
# print("Please replace '/dev/sdXN' with the actual device path of your USB drive.") # 提示替换路径
# else:
# print(f"Attempting to access Linux device: {device_path_linux}") # 打印尝试访问设备信息
# usb_device_file = open_linux_device(device_path_linux) # 调用函数打开设备
# if usb_device_file: # 如果设备成功打开
# print(f"Device {device_path_linux} opened. Ready for I/O operations.") # 打印设备已打开信息
# # 在这里可以进行读取操作
# # close_linux_device(usb_device_file) # 操作完成后关闭设备
# else:
# print(f"Could not open device {device_path_linux}.") # 打印无法打开设备信息
关于 os.O_DIRECT (Linux): 类似于Windows的 FILE_FLAG_NO_BUFFERING,它尝试绕过内核的页缓存。使用它时,Python的 read() 操作的参数(如读取大小、内存缓冲区的对齐)必须满足特定的对齐要求(通常是逻辑块大小的倍数)。这会使得编程更复杂,但可以获得更直接的磁盘访问。对于大多数恢复场景,标准的缓冲读取已经足够,除非有特别的性能或取证需求。
3.3 读取扇区数据
一旦成功打开了原始设备/分区,我们就可以使用文件对象的 seek() 方法定位到特定的偏移量,然后使用 read() 方法读取一定数量的字节。扇区通常是512字节或4096字节(4KB)。在进行数据恢复时,了解扇区大小很重要。
3.3.1 获取扇区大小 (可选但推荐)
Windows: 可以使用 DeviceIoControl 和 IOCTL_DISK_GET_DRIVE_GEOMETRY_EX 来获取磁盘的几何信息,包括每个扇区的字节数。
Linux: 对于块设备,扇区大小(逻辑块大小)通常可以通过读取 /sys/block/<device_name>/queue/logical_block_size (例如 /sys/block/sdb/queue/logical_block_size) 或使用 blockdev --getss /dev/sdb 命令来确定。在Python中,也可以尝试读取一个已知大小(如512字节),如果文件系统元数据(如引导扇区)中包含扇区大小信息,则优先使用那个。
如果无法动态获取,可以先假设为512字节,因为这是非常常见的值。
3.3.2 实现读取函数 (跨平台考虑)
我们可以创建一个包装函数来处理Windows和Linux的差异。
import platform # 导入platform模块,用于获取操作系统信息
# (接续之前的Windows API和Linux函数定义)
DEFAULT_SECTOR_SIZE = 512 # 定义默认扇区大小为512字节
def read_sectors_from_device(device_handle_or_file, start_sector, num_sectors, sector_size=DEFAULT_SECTOR_SIZE): # 定义从设备读取扇区的函数
"""
从打开的设备句柄(Windows)或文件对象(Linux)读取指定数量的扇区。
:param device_handle_or_file: Windows设备句柄 或 Linux文件对象。
:param start_sector: int, 要开始读取的扇区号 (从0开始)。
:param num_sectors: int, 要读取的扇区数量。
:param sector_size: int, 每个扇区的大小 (字节)。
:return: bytes, 读取到的数据;如果失败则返回None。
"""
offset = start_sector * sector_size # 计算起始偏移量(字节)
bytes_to_read = num_sectors * sector_size # 计算总共要读取的字节数
data = None # 初始化数据为None
current_os = platform.system() # 获取当前操作系统名称
if current_os == "Windows": # 如果是Windows系统
if device_handle_or_file == INVALID_HANDLE_VALUE or device_handle_or_file is None: # 如果句柄无效
print("Error: Invalid device handle for Windows.") # 打印无效句柄错误
return None # 返回None
# Windows: 使用 SetFilePointerEx 和 ReadFile
new_pointer_long = ctypes.c_int64() # 创建一个64位整数用于存储新的文件指针位置
# FILE_BEGIN = 0
if not SetFilePointerEx(device_handle_or_file, ctypes.c_int64(offset), ctypes.byref(new_pointer_long), 0): # 设置文件指针
error_code = ctypes.GetLastError() # 获取错误码
print(f"Windows: SetFilePointerEx failed to seek to offset {
offset}. Error: {
error_code}") # 打印设置指针失败信息
return None # 返回None
# print(f"Windows: Seeked to offset {offset}, new pointer at {new_pointer_long.value}") # 打印寻址成功信息 (调试用)
buffer = ctypes.create_string_buffer(bytes_to_read) # 创建一个指定大小的字符缓冲区
bytes_read_long = ctypes.c_uint32() # 创建一个无符号32位整数用于存储实际读取的字节数
if ReadFile(device_handle_or_file, buffer, bytes_to_read, ctypes.byref(bytes_read_long), None): # 读取文件
if bytes_read_long.value == bytes_to_read: # 如果实际读取的字节数等于期望读取的字节数
data = buffer.raw[:bytes_read_long.value] # 获取读取到的原始字节数据
# print(f"Windows: Read {bytes_read_long.value} bytes successfully.") # 打印读取成功信息 (调试用)
elif bytes_read_long.value > 0 : # 如果读取到的字节数大于0但小于期望值 (可能到达文件末尾)
data = buffer.raw[:bytes_read_long.value] # 获取部分数据
print(f"Windows: Read {
bytes_read_long.value} bytes (less than expected {
bytes_to_read}). Possibly EOF.") # 打印部分读取信息
else: # 如果实际读取的字节数为0
print(f"Windows: ReadFile succeeded but read 0 bytes from offset {
offset}.") # 打印读取0字节信息
data = b'' # 返回空字节串
else: # 如果ReadFile失败
error_code = ctypes.GetLastError() # 获取错误码
print(f"Windows: ReadFile failed to read {
bytes_to_read} bytes from offset {
offset}. Error: {
error_code}") # 打印读取失败信息
# 常见的错误码23: 数据错误(循环冗余检查)。表示磁盘扇区可能损坏。
if error_code == 23:
print("ReadFile Error 23: Data error (cyclic redundancy check). Possible bad sector.")
return None # 返回None
elif current_os == "Linux": # 如果是Linux系统
if device_handle_or_file is None: # 如果文件对象无效
print("Error: Invalid device file object for Linux.") # 打印无效文件对象错误
return None # 返回None
try:
device_handle_or_file.seek(offset) # 移动文件指针到指定偏移量
# print(f"Linux: Seeked to offset {offset}") # 打印寻址成功信息 (调试用)
data = device_handle_or_file.read(bytes_to_read) # 读取指定字节数的数据
if len(data) < bytes_to_read and len(data) > 0: # 如果读取到的数据长度小于期望但大于0
print(f"Linux: Read {
len(data)} bytes (less than expected {
bytes_to_read}). Possibly EOF.") # 打印部分读取信息
elif len(data) == 0 and bytes_to_read > 0: # 如果期望读取但读到0字节
print(f"Linux: Read 0 bytes from offset {
offset} when {
bytes_to_read} were expected.") # 打印读取0字节信息
# else:
# print(f"Linux: Read {len(data)} bytes successfully.") # 打印读取成功信息 (调试用)
except IOError as e: # 捕获IO错误
# IOError可能是由于坏道等原因
print(f"Linux: IOError during seek/read from offset {
offset}. Error: {
e}") # 打印IO错误信息
return None # 返回None
except Exception as e: # 捕获其他异常
print(f"Linux: Unexpected error during seek/read: {
e}") # 打印意外错误信息
return None # 返回None
else: # 如果是不支持的操作系统
print(f"Error: Unsupported operating system '{
current_os}' for raw disk access.") # 打印不支持的操作系统信息
return None # 返回None
return data # 返回读取到的数据
# --- 示例如何使用 read_sectors_from_device ---
# if __name__ == "__main__":
# current_os_name = platform.system() # 获取当前操作系统名称
# device_handle = None # 初始化设备句柄/文件对象
#
# # 根据操作系统选择设备路径和打开方式
# if current_os_name == "Windows":
# # !!! 再次强调: 确保这是你要操作的U盘,而不是系统盘 !!!
# # 例如: r'\.E:' (E盘是U盘) 或 r'\.PhysicalDrive1' (PhysicalDrive1是U盘)
# win_dev_path = r'\.E:' # 修改为你的U盘对应的盘符或物理驱动器
# if win_dev_path == r'\.E:' and not os.path.exists('E:\'): # 简单检查盘符是否存在 (不完全可靠)
# print(f"Path {win_dev_path} seems invalid as drive E: does not exist. Please check.")
# else:
# device_handle = open_windows_device(win_dev_path) # 打开Windows设备
# elif current_os_name == "Linux":
# # !!! 确保这是你要操作的U盘 !!!
# # 例如: /dev/sdb1 (U盘的第一个分区)
# linux_dev_path = "/dev/sdb1" # 修改为你的U盘对应的设备路径
# if not os.path.exists(linux_dev_path): # 检查设备文件是否存在
# print(f"Path {linux_dev_path} does not exist. Please check.")
# else:
# device_handle = open_linux_device(linux_dev_path) # 打开Linux设备
# else:
# print(f"OS {current_os_name} not directly supported by this example for raw access.") # 打印不支持的操作系统
#
# if device_handle and (device_handle != INVALID_HANDLE_VALUE if current_os_name == "Windows" else True): # 如果设备成功打开
# print("-" * 30) # 打印分隔线
# print("Attempting to read the first sector (Boot Sector)...") # 打印尝试读取第一个扇区信息
# # 读取第一个扇区 (扇区0),读取1个扇区,假设扇区大小为512字节
# boot_sector_data = read_sectors_from_device(device_handle, start_sector=0, num_sectors=1, sector_size=DEFAULT_SECTOR_SIZE) # 读取扇区数据
#
# if boot_sector_data: # 如果成功读取到数据
# print(f"Successfully read {len(boot_sector_data)} bytes for the boot sector.") # 打印成功读取字节数
# # 可以使用hexdump来查看扇区内容
# import hexdump # 导入hexdump模块
# print("Hexdump of the first 64 bytes of the boot sector:") # 打印引导扇区前64字节的十六进制转储
# hexdump.hexdump(boot_sector_data[:64]) # 十六进制转储数据的前64字节
#
# # 尝试读取FAT表或MFT的开始部分 (这需要知道文件系统类型和布局)
# # 例如,对于一个FAT32分区,引导扇区会告诉我们FAT表的位置
# # 假设引导扇区显示保留扇区数为32,每个FAT表占2000个扇区
# # 则第一个FAT表从扇区32开始
# # fat_start_sector = 32 # (这只是一个例子,需要从引导扇区解析)
# # print(f"
Attempting to read first sector of FAT (example sector {fat_start_sector})...")
# # fat_sector_data = read_sectors_from_device(device_handle, fat_start_sector, 1)
# # if fat_sector_data:
# # print(f"Read {len(fat_sector_data)} bytes from presumed FAT start.")
# # hexdump.hexdump(fat_sector_data[:64])
#
# else: # 如果读取失败
# print("Failed to read the boot sector.") # 打印读取引导扇区失败信息
#
# print("-" * 30) # 打印分隔线
# # 关闭设备
# if current_os_name == "Windows": # 如果是Windows系统
# close_windows_device(device_handle) # 关闭Windows设备
# elif current_os_name == "Linux": # 如果是Linux系统
# close_linux_device(device_handle) # 关闭Linux设备
# else: # 如果打开设备失败
# print("Device handle/file object is not valid. Cannot proceed with reading.") # 打印设备句柄/文件对象无效信息
#
代码解释与注意事项:
跨平台处理: read_sectors_from_device 函数内部通过 platform.system() 判断操作系统,然后调用相应的API(Windows的 SetFilePointerEx/ReadFile 或 Linux的 seek/read)。
错误处理: 每个平台的API调用都包含了基本的错误检查和打印。在实际的数据恢复工具中,这些错误应该被更详细地记录到日志文件中。特别是Windows的 GetLastError() 可以提供具体的错误原因。Linux的 IOError 尤其需要已关注,因为它可能表示磁盘坏道。
管理员/Root权限: 重复强调,执行这些底层磁盘访问操作几乎总是需要提升的权限。脚本需要以管理员身份(Windows)或root用户(Linux,或通过sudo)运行。
设备路径的准确性: 务必确保传递给 open_windows_device 或 open_linux_device 的设备路径是准确无误的,指向你希望进行数据恢复的U盘或其他非系统关键设备。操作错误的设备可能导致灾难性的数据丢失。
扇区大小: DEFAULT_SECTOR_SIZE 设为512。对于现代磁盘(尤其是4K Native磁盘或使用4K模拟扇区的U盘),真实的扇区大小可能是4096字节。如果文件系统(如引导扇区)中明确指出了扇区大小,应优先使用该值。读取操作的 bytes_to_read 和 offset 都应该基于正确的扇区大小计算,以确保读取到的是完整的逻辑单元。
读取错误 (坏道): 当 ReadFile (Windows) 或 read() (Linux) 遇到物理损坏的扇区(坏道)时,它们通常会失败并返回错误或抛出异常(如Windows的错误码23,Linux的IOError: [Errno 5] Input/output error)。一个健壮的数据恢复程序需要能够处理这种情况,例如:
记录坏道的位置。
尝试跳过坏道,继续读取后续的扇区。
如果一个文件的数据跨越了坏道,那么该文件可能只能部分恢复。
在读取大量数据时,可以实现一个循环,尝试逐个扇区读取,如果某个扇区读取失败,则用占位符数据(如全零字节)填充该扇区的缓冲区,并记录错误,然后继续读取下一个扇区。
3.4 创建磁盘/分区镜像 (可选但强烈推荐)
如前所述,直接在原始故障设备上进行恢复操作是有风险的。创建一个逐位的磁盘镜像是更安全的方法。Python也可以用来创建这样的镜像文件,尽管对于非常大的驱动器,使用专门的工具如 dd (Linux) 或 FTK Imager, dd for Windows 等可能更高效。
def create_disk_image(source_device_handle_or_file, output_image_path, sector_size=DEFAULT_SECTOR_SIZE, chunk_sectors=128, device_size_bytes=None): # 定义创建磁盘镜像的函数
"""
从源设备创建一个逐位拷贝的镜像文件。
:param source_device_handle_or_file: 打开的源设备句柄(Win)或文件对象(Lin)。
:param output_image_path: str, 输出镜像文件的路径。
:param sector_size: int, 扇区大小。
:param chunk_sectors: int, 每次读取/写入的扇区数量 (块大小)。
:param device_size_bytes: int, (可选) 源设备的总大小(字节)。如果为None,会尝试读取直到EOF。
:return: bool, True如果成功,False如果失败。
"""
current_os = platform.system() # 获取当前操作系统名称
print(f"Starting to create image of device to '{
output_image_path}'...") # 打印开始创建镜像信息
print(f"Sector size: {
sector_size} bytes, Chunk size: {
chunk_sectors} sectors ({
chunk_sectors * sector_size} bytes)") # 打印扇区大小和块大小信息
try:
with open(output_image_path, 'wb') as img_file: # 以二进制写入模式打开输出镜像文件
total_bytes_written = 0 # 初始化总写入字节数
start_sector = 0 # 初始化起始扇区
read_errors = 0 # 初始化读取错误计数
while True: # 无限循环,直到读取完成或遇到问题
# print(f"Imaging: Reading chunk starting at sector {start_sector}...") # 打印正在读取的块信息 (调试用,会产生大量输出)
data_chunk = read_sectors_from_device(source_device_handle_or_file, start_sector, chunk_sectors, sector_size) # 从源设备读取一块数据
if data_chunk is None: # 如果读取数据块失败 (可能遇到坏道)
print(f"Warning: Failed to read chunk at sector {
start_sector}. Filling with zeros.") # 打印读取块失败警告
# 用零填充这个块,并继续 (或者可以选择终止)
img_file.write(b'x00' * (chunk_sectors * sector_size)) # 向镜像文件写入零字节
total_bytes_written += (chunk_sectors * sector_size) # 累加写入字节数 (即使是零)
read_errors += 1 # 读取错误计数加1
elif not data_chunk: # 如果读取到空数据 (通常表示到达设备末尾)
print("Reached end of source device (or read empty chunk).") # 打印到达设备末尾信息
break # 跳出循环
else: # 如果成功读取到数据块
img_file.write(data_chunk) # 将数据块写入镜像文件
total_bytes_written += len(data_chunk) # 累加实际写入的字节数
start_sector += chunk_sectors # 更新下一个起始扇区号
# 打印进度 (可以根据需要调整频率)
if start_sector % (chunk_sectors * 10) == 0: # 每10个块打印一次进度
progress_mb = total_bytes_written / (1024 * 1024) # 计算已处理的MB数
print(f"Progress: {
progress_mb:.2f} MB written. Current sector: {
start_sector}. Read errors: {
read_errors}") # 打印进度信息
if device_size_bytes is not None and total_bytes_written >= device_size_bytes: # 如果指定了设备大小且已达到
print(f"Reached specified device size of {
device_size_bytes} bytes.") # 打印达到指定设备大小信息
break # 跳出循环
print(f"Disk imaging finished. Total bytes written: {
total_bytes_written} ({
total_bytes_written / (1024*1024):.2f} MB).") # 打印镜像完成信息
if read_errors > 0: # 如果有读取错误
print(f"Warning: Encountered {
read_errors} read error(s) during imaging. Image may be incomplete or contain zeroed blocks.") # 打印读取错误警告
return True # 返回成功标志
except IOError as e: # 捕获IO错误 (通常是写入镜像文件时发生)
print(f"IOError during disk imaging (likely writing to output file '{
output_image_path}'): {
e}") # 打印IO错误信息
return False # 返回失败标志
except Exception as e: # 捕获其他异常
print(f"An unexpected error occurred during disk imaging: {
e}") # 打印意外错误信息
return False # 返回失败标志
# --- 示例如何使用 create_disk_image ---
# if __name__ == "__main__":
# # ... (与之前相同的设备打开逻辑) ...
# current_os_name = platform.system()
# device_handle = None
# source_device_path = ""
# if current_os_name == "Windows":
# source_device_path = r'\.E:' # 修改为你的U盘对应的盘符或物理驱动器
# device_handle = open_windows_device(source_device_path)
# elif current_os_name == "Linux":
# source_device_path = "/dev/sdb1" # 修改为你的U盘对应的设备路径
# device_handle = open_linux_device(source_device_path)
# # else ...
# output_image_file_path = "usb_image.dd" # 定义输出镜像文件名
# if device_handle and (device_handle != INVALID_HANDLE_VALUE if current_os_name == "Windows" else True):
# print(f"
Starting imaging process for {source_device_path} to {output_image_file_path}") # 打印开始镜像进程信息
# # 你可能需要一种方法来获取源设备的大小,以避免读取超出范围或无限循环
# # 对于Windows,可以使用DeviceIoControl与IOCTL_DISK_GET_LENGTH_INFO
# # 对于Linux,可以使用 os.fstat(device_handle.fileno()).st_size (如果适用) 或 blockdev --getsize64
# # 这里我们暂时不实现动态获取大小,假设你知道大致范围或让它读到EOF
# # device_actual_size = get_device_size_in_bytes(device_handle) # (这是一个需要你实现的函数)
# device_actual_size = None # 暂时不指定大小,让它尝试读到EOF
# success = create_disk_image(device_handle, output_image_file_path,
# sector_size=DEFAULT_SECTOR_SIZE,
# chunk_sectors=2048, # 每次读写 2048 * 512 B = 1MB
# device_size_bytes=device_actual_size)
# if success: # 如果镜像成功
# print(f"Image creation to '{output_image_file_path}' completed successfully (or with logged read errors).") # 打印镜像创建成功信息
# print("It is recommended to perform recovery operations on this image file instead of the original device.") # 建议在镜像文件上操作
# else: # 如果镜像失败
# print(f"Image creation to '{output_image_file_path}' failed.") # 打印镜像创建失败信息
# # 关闭设备
# if current_os_name == "Windows":
# close_windows_device(device_handle)
# elif current_os_name == "Linux":
# close_linux_device(device_handle)
# else:
# print("Device could not be opened. Imaging aborted.")
镜像函数解释:
create_disk_image 循环读取源设备的数据块(chunk_sectors * sector_size 字节),并将这些块写入到输出的镜像文件中。
错误处理: 如果 read_sectors_from_device 返回 None(表示读取错误,可能是坏道),它会向镜像文件写入等量的零字节,并记录错误。这是一种常见的做法,叫做“用零填充坏道”,以保持镜像的完整性和偏移量正确,但损坏区域的数据会丢失。
进度报告: 简单地每处理N个块打印一次进度。
设备大小 (device_size_bytes): 如果能预先知道设备的总大小,可以作为循环的终止条件之一。否则,函数会一直读取,直到 read_sectors_from_device 返回空字节串(表示到达设备末尾)或 None。动态获取设备大小是一个依赖于操作系统的复杂任务,这里未完全实现。
块大小 (chunk_sectors): 选择一个合适的块大小(例如1MB到64MB)可以平衡读取效率和内存使用。太小的块会导致过多的读写调用,太大的块可能消耗过多内存。
一旦我们有了打开原始设备(或其镜像文件)并从中读取数据的能力,下一步就是解析这些数据,以识别文件系统的结构、查找CHK文件内容、或者扫描已删除文件的痕迹。这将涉及到对特定文件系统(如FAT32, NTFS)的引导扇区、文件分配表、目录条目、MFT等关键数据结构的深入理解和编程解析。
掌握了这些底层的磁盘I/O操作,我们就为后续更复杂的数据恢复任务打下了坚实的基础。接下来的部分将开始聚焦于如何利用这些能力来识别和恢复CHK文件。
第四部分:CHK文件恢复实战:识别与提取
FILExxxx.CHK 文件本质上是原始数据簇的集合,它们失去了文件名和明确的文件类型信息。恢复它们的核心思想是 通过分析文件内容本身来识别其原始类型,并尝试将其重命名或转换为可用的格式。
4.1 CHK文件恢复的核心策略
主要的恢复策略可以分为以下几种,并且常常结合使用:
基于文件签名 (File Signature / Magic Number) 的识别:
许多文件类型在其文件数据的开头(有时也在文件末尾)包含特定的字节序列,称为文件签名或魔数。这些签名可以作为识别文件类型的强烈线索。
例如:
JPEG图像文件通常以 FF D8 FF E0 xx xx 4A 46 49 46 00 (ÿØÿà..JFIF.) 或 FF D8 FF E1 xx xx 45 78 69 66 00 (ÿØÿá..Exif.) 开头。
PNG图像文件以 89 50 4E 47 0D 0A 1A 0A (.PNG....) 开头。
PDF文档以 %PDF- (即 25 50 44 46 2D) 开头。
ZIP压缩文件以 PK (即 50 4B 03 04) 开头。
DOCX, XLSX, PPTX (Office Open XML) 文件本质上是ZIP压缩文件,所以也以 PK 开头,但其内部结构包含特定的XML文件可以进一步确认。
MP3文件没有统一的简单头部签名,但可能包含ID3标签 (ID3 字节序列) 或帧同步字 (例如 FFFx)。
优点: 速度快,对于具有明显签名的文件类型识别准确率高。
缺点:
并非所有文件类型都有唯一的、易于识别的签名。
签名可能很短,容易产生误判(例如,某个随机数据块恰好包含了某个短签名)。
文件可能损坏,导致签名缺失或不完整。
CHK文件可能只包含文件的一部分,如果签名在文件头部,而CHK文件是中间或尾部片段,则此方法无效。
基于文件结构和内容特征的分析:
对于某些文件类型,即使没有明确的头部签名,其内部数据结构也可能具有可识别的模式。
例如:
文本文件 (TXT, CSV, LOG, 源码等): 通常包含大量可打印的ASCII或UTF-8字符。可以通过统计字符的分布来判断。如果一个CHK文件大部分由可见字符、换行符、制表符组成,它很可能是一个文本文件。
HTML/XML文件: 包含大量的 < 和 > 标签。
数据库文件 (如SQLite): SQLite数据库文件以特定的16字节字符串 “SQLite format 300” 开头(在文件偏移0处)。
多媒体文件: MP4, MOV等视频文件有复杂的内部原子/盒子结构 (ftyp, moov, mdat等),可以尝试解析这些结构。
优点: 可以识别一些没有简单签名的文件类型,或者当签名损坏时仍可能有效。
缺点: 实现起来更复杂,需要对特定文件格式有深入了解。计算开销可能更大。
基于文件大小和簇大小的推断:
CHK文件的大小是其包含的簇的总大小。如果知道原始文件系统(例如FAT32)的每簇扇区数和每扇区字节数,就可以计算出CHK文件包含多少个簇。
对于某些文件类型,其大小可能有一定的规律。例如,非常小的CHK文件(比如只有一个簇)不太可能是大型视频文件。
这个信息本身不足以识别类型,但可以作为辅助判断或过滤条件。
启发式方法和机器学习 (高级):
可以训练机器学习模型来根据文件的字节频率、熵、n-gram模式等特征来分类文件类型。
这通常用于更高级的文件类型识别或未知文件类型的聚类。对于常规的CHK恢复,可能过于复杂。
用户辅助和上下文关联:
如果用户大致记得丢失了哪些类型的文件,或者CHK文件所在的 FOUND.xxx 文件夹的时间戳与某个特定操作(如U盘损坏)的时间相近,这些信息可以帮助缩小识别范围。
如果多个CHK文件被识别为同一类型(例如多个JPEG),并且它们的大小和内容暗示它们可能是连续的,可以尝试将它们拼接起来(这非常困难且容易出错)。
在实践中,基于文件签名的识别是最常用且效果最直接的方法,我们将首先重点实现它。
4.2 构建文件签名数据库
要进行基于签名的识别,我们需要一个文件签名数据库。这个数据库应该包含:
文件扩展名 (或类型描述): 例如 .jpg, .png, .pdf, .zip。
签名 (字节序列): 文件头部(有时也包括尾部)的魔数字节。
签名的偏移量: 签名在文件中的起始位置(通常是0)。
(可选) 签名的长度: 用于比较的字节数。
(可选) 更多验证规则: 例如,某个偏移量处必须是特定值,或者文件大小必须在某个范围内。
我们可以将这个数据库存储在一个Python字典、列表的元组、JSON文件或CSV文件中。
示例:一个简单的Python文件签名数据库 (字典形式)
# file_signatures.py (可以保存为一个单独的模块)
# 文件签名数据库
# 格式: '扩展名': [{'offset': int, 'signature': bytes, 'description': str (可选)}]
# 一个扩展名可能有多个签名变体或需要多个位置的签名来确认
FILE_SIGNATURES = {
'.jpg': [ # JPEG图像
{
'offset': 0, 'signature': b'xFFxD8xFFxE0', 'min_len': 10, 'description': 'JPEG (JFIF marker)'}, # JFIF (SOI + APP0)
{
'offset': 0, 'signature': b'xFFxD8xFFxE1', 'min_len': 10, 'description': 'JPEG (Exif marker)'}, # Exif (SOI + APP1)
{
'offset': 0, 'signature': b'xFFxD8xFFxDB', 'min_len': 4, 'description': 'JPEG (SOI + DQT marker, less common start)'} # 裸JPEG流
],
'.jpeg': [ # JPEG的另一种常见扩展名
{
'offset': 0, 'signature': b'xFFxD8xFFxE0', 'min_len': 10, 'description': 'JPEG (JFIF marker)'},
{
'offset': 0, 'signature': b'xFFxD8xFFxE1', 'min_len': 10, 'description': 'JPEG (Exif marker)'},
{
'offset': 0, 'signature': b'xFFxD8xFFxDB', 'min_len': 4, 'description': 'JPEG (SOI + DQT marker, less common start)'}
],
'.png': [ # PNG图像
{
'offset': 0, 'signature': b'x89x50x4Ex47x0Dx0Ax1Ax0A', 'description': 'PNG Image'}
],
'.gif': [ # GIF图像
{
'offset': 0, 'signature': b'x47x49x46x38x37x61', 'description': 'GIF87a Image'}, # GIF87a
{
'offset': 0, 'signature': b'x47x49x46x38x39x61', 'description': 'GIF89a Image'} # GIF89a
],
'.bmp': [ # BMP图像
{
'offset': 0, 'signature': b'x42x4D', 'description': 'BMP Image (Windows Bitmap)'} # "BM"
],
'.pdf': [ # PDF文档
{
'offset': 0, 'signature': b'x25x50x44x46x2D', 'description': 'Adobe PDF Document'} # "%PDF-"
],
'.zip': [ # ZIP压缩文件
{
'offset': 0, 'signature': b'x50x4Bx03x04', 'description': 'ZIP compressed archive (PK normally)'} # "PKx03x04"
],
'.rar': [ # RAR压缩文件
{
'offset': 0, 'signature': b'x52x61x72x21x1Ax07x00', 'description': 'RAR compressed archive (v1.5 to v4.x)'}, # Rar!
{
'offset': 0, 'signature': b'x52x61x72x21x1Ax07x01x00', 'description': 'RAR compressed archive (v5.0+)'} # Rar! (new format)
],
'.7z': [ # 7-Zip压缩文件
{
'offset': 0, 'signature': b'x37x7AxBCxAFx27x1C', 'description': '7-Zip compressed archive'} # "7z¼¯'x1c"
],
'.docx': [ # Microsoft Word (Open XML)
# DOCX, XLSX, PPTX 都是基于ZIP的,所以它们的头部签名是PK。
# 严格识别它们需要检查ZIP内部的文件,例如 [Content_Types].xml 或特定的rels文件。
# 简单起见,我们先用PK,后续可以增加更复杂的ZIP内部检查。
{
'offset': 0, 'signature': b'x50x4Bx03x04', 'description': 'Microsoft Word (DOCX - ZIP based)'}
],
'.xlsx': [ # Microsoft Excel (Open XML)
{
'offset': 0, 'signature': b'x50x4Bx03x04', 'description': 'Microsoft Excel (XLSX - ZIP based)'}
],
'.pptx': [ # Microsoft PowerPoint (Open XML)
{
'offset': 0, 'signature': b'x50x4Bx03x04', 'description': 'Microsoft PowerPoint (PPTX - ZIP based)'}
],
'.doc': [ # Microsoft Word (Legacy OLE CFB format)
{
'offset': 0, 'signature': b'xD0xC_Fx11xE0xA1xB1x1AxE1', 'description': 'Microsoft Word (DOC - OLE CFB)'} # OLE Compound File Binary Format
],
'.xls': [ # Microsoft Excel (Legacy OLE CFB format)
{
'offset': 0, 'signature': b'xD0xC_Fx11xE0xA1xB1x1AxE1', 'description': 'Microsoft Excel (XLS - OLE CFB)'}
],
'.ppt': [ # Microsoft PowerPoint (Legacy OLE CFB format)
{
'offset': 0, 'signature': b'xD0xC_Fx11xE0xA1xB1x1AxE1', 'description': 'Microsoft PowerPoint (PPT - OLE CFB)'}
],
'.mp3': [ # MP3音频 (ID3v2 tag is common at the beginning)
{
'offset': 0, 'signature': b'x49x44x33', 'description': 'MP3 Audio (ID3v2 tag)'} # "ID3"
# MP3帧同步字通常是 11个1 (FFF) 开头,但变化较多,较难作为唯一头部签名
],
'.wav': [ # WAV音频
{
'offset': 0, 'signature': b'x52x49x46x46', 'min_len': 12, 'description': 'WAV Audio (RIFF header)'}, # "RIFF"
# 并且在偏移量8处通常是 "WAVE" (b'x57x41x56x45')
],
'.avi': [ # AVI视频
{
'offset': 0, 'signature': b'x52x49x46x46', 'min_len': 12, 'description': 'AVI Video (RIFF header)'}, # "RIFF"
# 并且在偏移量8处通常是 "AVI " (b'x41x56x49x20')
],
'.mp4': [ # MP4视频
# MP4文件通常以一个包含 "ftyp" 的box开始,但 "ftyp" 不在文件最开头。
# 第一个box的size (4字节) + 'ftyp' (4字节) = 偏移量4处是 'ftyp'
# 或者其他常见的box类型如 'moov', 'mdat'
# 这里我们简化,检查偏移4处的'ftyp'
# 注意:一个CHK文件如果很小,可能不包含这个签名。
{
'offset': 4, 'signature': b'x66x74x79x70', 'min_len': 8, 'description': 'MP4 Video (ftyp box identifier)'} # "ftyp"
],
'.mov': [ # MOV视频 (QuickTime)
{
'offset': 4, 'signature': b'x66x74x79x70', 'min_len': 8, 'description': 'MOV Video (ftyp box identifier)'}, # 通常以 "ftyp" 开头
{
'offset': 4, 'signature': b'x6Dx6Fx6Fx76', 'min_len': 8, 'description': 'MOV Video (moov box identifier)'}, # 或者 "moov"
{
'offset': 4, 'signature': b'x6Dx64x61x74', 'min_len': 8, 'description': 'MOV Video (mdat box identifier)'} # 或者 "mdat" (数据部分,不一定在头部)
],
'.exe': [ # Windows可执行文件
{
'offset': 0, 'signature': b'x4Dx5A', 'description': 'Windows Executable (MZ header)'} # "MZ"
],
'.dll': [ # Windows动态链接库
{
'offset': 0, 'signature': b'x4Dx5A', 'description': 'Windows DLL (MZ header)'} # "MZ"
],
'.sqlite': [ # SQLite数据库文件
{
'offset': 0, 'signature': b'x53x51x4Cx69x74x65x20x66x6Fx72x6Dx61x74x20x33x00', 'description': 'SQLite Database file'}
],
# 可以根据需要不断扩展这个数据库...
# 例如文本文件,可以通过内容分析而不是固定签名
}
# 可以添加一些辅助信息,比如哪些类型是基于ZIP的,需要进一步检查内部
ZIP_BASED_EXTENSIONS = ['.docx', '.xlsx', '.pptx'] # 基于ZIP的扩展名列表
# 对于一些文件类型,除了头部签名,尾部签名也很重要,或者需要更复杂的验证逻辑
# 例如,JPEG文件以 FF D9 结尾。
# PNG文件包含IEND块。
# ZIP文件有中央目录结构在文件尾部。
def get_max_signature_length(): # 定义获取最大签名长度的函数
"""计算数据库中所有签名所需检查的最大字节数。"""
max_len = 0 # 初始化最大长度为0
for ext_signatures in FILE_SIGNATURES.values(): # 遍历所有扩展名的签名列表
for sig_info in ext_signatures: # 遍历每个签名信息
current_check_len = sig_info['offset'] + len(sig_info['signature']) # 计算当前检查长度 = 偏移量 + 签名长度
if current_check_len > max_len: # 如果当前检查长度大于最大长度
max_len = current_check_len # 更新最大长度
return max_len if max_len > 0 else 256 # 返回最大长度,如果为0则返回一个默认值 (例如256)
数据库说明:
使用字典,键是文件扩展名(例如 .jpg)。
每个扩展名对应一个列表,因为某些文件类型可能有多种有效的起始签名(例如JPEG的JFIF和Exif)。
列表中的每个元素是一个字典,包含:
'offset': 签名在文件中的字节偏移量。
'signature': 实际的签名字节序列 (使用 b'' 表示字节串)。
'min_len' (可选): CHK文件至少需要多长才能包含这个完整的签名。如果CHK文件比 offset + len(signature) 还短,那么肯定不匹配。如果CHK文件比min_len短,也可能不匹配(例如,一个完整的JPEG文件通常不会只有几个字节)。
'description' (可选): 对签名的简要描述。
这个数据库可以不断扩充和完善。互联网上有许多关于文件签名的资源(例如 Gary Kessler’s File Signatures Table)。
4.3 实现CHK文件识别器
现在我们可以编写一个Python函数,它接受一个CHK文件的路径,读取其头部数据,并与签名数据库进行比较。
import os # 导入os模块
# 假设 file_signatures.py 和上面的 FILE_SIGNATURES 数据库在同一目录或已导入
# from file_signatures import FILE_SIGNATURES, get_max_signature_length # 如果在不同文件
MAX_HEADER_READ_SIZE = get_max_signature_length() # 获取需要读取的最大头部字节数,基于签名数据库
def identify_chk_file_type(chk_file_path): # 定义识别CHK文件类型的函数
"""
尝试通过文件签名识别CHK文件的类型。
:param chk_file_path: str, CHK文件的完整路径。
:return: str, 推断出的文件扩展名 (例如 '.jpg', '.pdf');如果无法识别则返回None。
"""
if not os.path.exists(chk_file_path): # 如果文件不存在
# logger.error(f"CHK file not found: {chk_file_path}") # 记录错误日志
print(f"Error: CHK file not found at {
chk_file_path}") # 打印错误信息
return None # 返回None
if os.path.getsize(chk_file_path) == 0: # 如果文件大小为0
# logger.info(f"CHK file is empty: {chk_file_path}") # 记录参考日志
# print(f"Info: CHK file {chk_file_path} is empty.") # 打印参考信息
return ".empty" # 可以返回一个特殊标记表示空文件
try:
with open(chk_file_path, 'rb') as f_chk: # 以二进制读取模式打开CHK文件
header_data = f_chk.read(MAX_HEADER_READ_SIZE) # 读取文件头部数据,最大长度为签名数据库所需
if not header_data: # 如果读取不到数据 (理论上前面已判断非空)
return None # 返回None
for extension, signatures_list in FILE_SIGNATURES.items(): # 遍历签名数据库中的每个扩展名及其签名列表
for sig_info in signatures_list: # 遍历该扩展名的每个签名信息
offset = sig_info['offset'] # 获取签名偏移量
signature_bytes = sig_info['signature'] # 获取签名字节序列
sig_len = len(signature_bytes) # 获取签名长度
min_file_len_for_sig = offset + sig_len # 计算包含此签名所需的文件最小长度
if len(header_data) >= min_file_len_for_sig: # 如果读取到的头部数据足够长以包含此签名
# 从头部数据中提取对应偏移和长度的片段进行比较
file_segment_to_check = header_data[offset : offset + sig_len] # 提取待检查的文件片段
if file_segment_to_check == signature_bytes: # 如果文件片段与签名匹配
# 初步匹配成功!
# (可选) 进行更严格的检查,比如文件大小下限
min_overall_len = sig_info.get('min_len', 0) # 获取该签名定义的最小文件总长度 (默认为0)
if os.path.getsize(chk_file_path) >= min_overall_len: # 如果文件实际大小满足最小长度要求
# logger.info(f"File {chk_file_path} identified as {extension} based on: {sig_info.get('description', 'N/A')}") # 记录日志
return extension # 返回识别出的扩展名
# else:
# logger.debug(f"Signature match for {extension} on {chk_file_path}, but file too short ({os.path.getsize(chk_file_path)} < {min_overall_len}).")
# 文件太短,可能不是这个类型,或者是个损坏的文件,继续尝试其他签名
pass # 继续尝试其他签名
# 如果所有签名都不匹配,可以尝试基于内容的启发式方法
# 例如,检查是否为文本文件
if is_likely_text_file(header_data, chk_file_path): # 调用函数判断是否可能为文本文件
return ".txt" # 如果是,返回.txt
except IOError as e: # 捕获IO错误
# logger.error(f"IOError reading CHK file {chk_file_path}: {e}", exc_info=True) # 记录错误日志
print(f"Error reading CHK file {
chk_file_path}: {
e}") # 打印错误信息
except Exception as e: # 捕获其他异常
# logger.error(f"Unexpected error processing CHK file {chk_file_path}: {e}", exc_info=True) # 记录错误日志
print(f"Unexpected error processing CHK file {
chk_file_path}: {
e}") # 打印错误信息
return None # 如果无法识别,返回None
def is_likely_text_file(data_sample, file_path_for_size=None, threshold_ratio=0.85, sample_size=1024): # 定义判断是否可能为文本文件的函数
"""
启发式地判断一段数据是否可能是文本文件。
:param data_sample: bytes, 要检查的数据样本 (通常是文件头部)。
:param file_path_for_size: str, (可选) 文件路径,用于获取文件总大小以决定是否完整读取。
:param threshold_ratio: float, 可打印ASCII字符(加常见空白符)的比例阈值。
:param sample_size: int, 如果提供了file_path_for_size且文件较大,则读取这么大的样本。
:return: bool, True如果是文本文件,False否则。
"""
if not data_sample: # 如果数据样本为空
return False # 返回False
# 如果样本较小,但提供了文件路径,可以尝试读取更多内容来判断
if file_path_for_size and len(data_sample) < sample_size: # 如果样本长度小于指定大小且提供了文件路径
try:
file_size = os.path.getsize(file_path_for_size) # 获取文件大小
read_len = min(sample_size, file_size) # 计算要读取的长度
if read_len > len(data_sample): # 如果要读取的长度大于当前样本长度
with open(file_path_for_size, 'rb') as f_full: # 以二进制读取模式打开文件
data_sample = f_full.read(read_len) # 读取指定长度的数据作为新样本
except: # 捕获任何异常
pass # 忽略读取完整样本的错误,继续使用现有样本
if not data_sample: # 再次检查样本是否为空 (可能读取失败)
return False # 返回False
num_printable = 0 # 初始化可打印字符计数
num_control_or_binary = 0 # 初始化控制或二进制字符计数
# 定义可接受的文本字符范围 (ASCII可打印字符 + 常见空白符)
# chr(9) = TAB, chr(10) = LF, chr(13) = CR
# 可打印ASCII范围是 32 (space) 到 126 (~)
# 我们也考虑常见的UTF-8编码,但简单起见,先主要基于ASCII范围和常见空白符
# 更复杂的文本检测会考虑Unicode块、字符频率等。
for byte_val in data_sample: # 遍历样本中的每个字节
if 32 <= byte_val <= 126 or byte_val in [9, 10, 13]: # 如果字节是ASCII可打印字符或TAB/LF/CR
num_printable += 1 # 可打印字符计数加1
elif byte_val == 0: # NULL字节通常在二进制文件中更常见,文本文件较少(除非是UTF-16/32的填充)
# 对于UTF-8文本,0字节是不合法的。
# 简单处理:如果NULL字节过多,可能不是纯文本。
num_control_or_binary += 0.5 # 给NULL字节一个较小的惩罚 (可调整)
elif byte_val < 32 or byte_val > 126: # 其他控制字符或扩展ASCII/二进制数据
num_control_or_binary += 1 # 控制或二进制字符计数加1
total_chars = len(data_sample) # 获取样本总长度
if total_chars == 0: # 如果总长度为0
return False # 返回False
# 计算可打印字符的比例
printable_ratio = num_printable / total_chars # 计算可打印字符比例
# logger.debug(f"Text check for sample (len {total_chars}): printable_ratio={printable_ratio:.2f}") # 记录文本检查日志 (调试用)
if printable_ratio >= threshold_ratio: # 如果可打印字符比例大于等于阈值
# 可以进一步检查是否有过多的重复非文本字符,或者非常长的非文本序列
# 但作为初步判断,这个比例已经比较有用了
return True # 返回True
return False # 返回False
# --- 主处理逻辑:遍历FOUND.xxx文件夹,识别并重命名/复制CHK文件 ---
def process_found_folder(found_folder_path, output_renamed_folder=None, copy_files=True): # 定义处理FOUND文件夹的函数
"""
遍历FOUND.xxx文件夹中的CHK文件,尝试识别它们,并重命名或复制到新位置。
:param found_folder_path: str, FOUND.xxx 文件夹的路径。
:param output_renamed_folder: str, (可选) 用于存放重命名/复制后文件的输出文件夹路径。
如果为None,则在原地重命名 (风险较高)。
:param copy_files: bool, 如果为True且提供了output_renamed_folder,则复制文件;否则移动/重命名。
"""
if not os.path.isdir(found_folder_path): # 如果路径不是文件夹
print(f"Error: '{
found_folder_path}' is not a valid directory.") # 打印错误信息
# logger.error(f"'{found_folder_path}' is not a directory.") # 记录错误日志
return # 直接返回
if output_renamed_folder: # 如果指定了输出文件夹
if not os.path.exists(output_renamed_folder): # 如果输出文件夹不存在
try:
os.makedirs(output_renamed_folder) # 创建输出文件夹 (包括任何必要的父目录)
print(f"Created output directory: {
output_renamed_folder}") # 打印创建输出目录信息
# logger.info(f"Created output directory: {output_renamed_folder}") # 记录日志
except OSError as e: # 捕获OS错误
print(f"Error creating output directory '{
output_renamed_folder}': {
e}") # 打印创建目录错误信息
# logger.error(f"Failed to create output dir '{output_renamed_folder}': {e}", exc_info=True) # 记录错误日志
return # 直接返回
elif not os.path.isdir(output_renamed_folder): # 如果输出路径存在但不是文件夹
print(f"Error: Output path '{
output_renamed_folder}' exists but is not a directory.") # 打印错误信息
return # 直接返回
processed_count = 0 # 初始化已处理文件计数
identified_count = 0 # 初始化已识别文件计数
unknown_count = 0 # 初始化未知文件计数
print(f"
Processing CHK files in: {
found_folder_path}") # 打印正在处理的文件夹信息
if output_renamed_folder: # 如果指定了输出文件夹
print(f"Recovered files will be {
'copied' if copy_files else 'moved'} to: {
output_renamed_folder}") # 打印恢复文件存放位置信息
else:
print("Warning: Files will be renamed in place. This is risky. Consider specifying an output folder.") # 打印原地重命名警告
for filename in os.listdir(found_folder_path): # 遍历文件夹中的所有文件名
if filename.upper().startswith("FILE") and filename.upper().endswith(".CHK"): # 如果文件名以FILE开头且以.CHK结尾
chk_file_full_path = os.path.join(found_folder_path, filename) # 构造CHK文件的完整路径
processed_count += 1 # 已处理文件计数加1
# print(f"Identifying: {filename}...") # 打印正在识别的文件名 (调试用,会很多)
# logger.debug(f"Attempting to identify: {chk_file_full_path}") # 记录日志
identified_ext = identify_chk_file_type(chk_file_full_path) # 调用函数识别CHK文件类型
if identified_ext: # 如果成功识别出扩展名
identified_count += 1 # 已识别文件计数加1
base_name = os.path.splitext(filename)[0] # 获取文件名(不含扩展名)
new_filename = f"{
base_name}{
identified_ext}" # 构造新的文件名
if output_renamed_folder: # 如果指定了输出文件夹
new_file_full_path = os.path.join(output_renamed_folder, new_filename) # 构造新文件的完整路径
# 防止文件名冲突 (虽然CHK文件名通常唯一,但重命名后可能与之前恢复的冲突)
counter = 1 # 初始化计数器
temp_new_path = new_file_full_path # 初始化临时新路径
temp_base, temp_ext = os.path.splitext(new_filename) # 分离基本名和扩展名
while os.path.exists(temp_new_path): # 如果新路径已存在
temp_new_path = os.path.join(output_renamed_folder, f"{
temp_base}_{
counter}{
temp_ext}") # 构造新的临时路径 (添加序号)
counter += 1 # 计数器加1
new_file_full_path = temp_new_path # 更新新文件完整路径
try:
if copy_files: # 如果是复制文件
import shutil # 导入shutil模块
shutil.copy2(chk_file_full_path, new_file_full_path) # 复制文件 (copy2会尝试保留元数据)
print(f" Identified: {
filename} -> Copied as: {
os.path.basename(new_file_full_path)}") # 打印识别并复制信息
# logger.info(f"Identified {chk_file_full_path} as {identified_ext}, copied to {new_file_full_path}") # 记录日志
else: # 如果是移动/重命名文件
os.rename(chk_file_full_path, new_file_full_path) # 重命名/移动文件
print(f" Identified: {
filename} -> Renamed/Moved as: {
os.path.basename(new_file_full_path)}") # 打印识别并重命名/移动信息
# logger.info(f"Identified {chk_file_full_path} as {identified_ext}, moved to {new_file_full_path}") # 记录日志
except Exception as e: # 捕获异常
print(f" Error processing {
filename} to {
os.path.basename(new_file_full_path)}: {
e}") # 打印处理错误信息
# logger.error(f"Failed to copy/move {chk_file_full_path} to {new_file_full_path}: {e}", exc_info=True) # 记录错误日志
identified_count -=1 # 因为处理失败,从已识别计数中减去
unknown_count +=1 # 增加未知文件计数
else: # 如果没有指定输出文件夹 (原地重命名)
new_file_full_path_inplace = os.path.join(found_folder_path, new_filename) # 构造原地重命名的新路径
# 原地重命名也需要处理潜在的命名冲突,尽管概率小
if chk_file_full_path.upper() != new_file_full_path_inplace.upper(): # 只有当新旧文件名不同时才重命名
counter = 1
temp_new_path_inplace = new_file_full_path_inplace
temp_base_inplace, temp_ext_inplace = os.path.splitext(new_filename)
while os.path.exists(temp_new_path_inplace):
temp_new_path_inplace = os.path.join(found_folder_path, f"{
temp_base_inplace}_{
counter}{
temp_ext_inplace}")
counter += 1
new_file_full_path_inplace = temp_new_path_inplace
try:
os.rename(chk_file_full_path, new_file_full_path_inplace) # 原地重命名
print(f" Identified: {
filename} -> Renamed in place as: {
os.path.basename(new_file_full_path_inplace)}") # 打印识别并原地重命名信息
# logger.info(f"Identified {chk_file_full_path} as {identified_ext}, renamed in place to {new_file_full_path_inplace}")
except Exception as e:
print(f" Error renaming {
filename} in place: {
e}") # 打印原地重命名错误信息
# logger.error(f"Failed to rename {chk_file_full_path} in place: {e}", exc_info=True)
identified_count -=1
unknown_count +=1
else: # 如果新旧文件名相同 (例如,一个txt文件被识别为txt)
print(f" Identified: {
filename} as {
identified_ext} (no rename needed).") # 打印识别但无需重命名信息
else: # 如果无法识别文件类型
unknown_count += 1 # 未知文件计数加1
print(f" Unknown type: {
filename} (size: {
os.path.getsize(chk_file_full_path)} bytes)") # 打印未知类型信息
# logger.info(f"Could not identify type for {chk_file_full_path}") # 记录日志
print(f"
--- Processing Summary for {
found_folder_path} ---") # 打印处理摘要信息
print(f"Total CHK files found: {
processed_count}") # 打印找到的CHK文件总数
print(f"Files identified and processed: {
identified_count}") # 打印已识别并处理的文件数
print(f"Files of unknown type: {
unknown_count}") # 打印未知类型文件数
if output_renamed_folder: # 如果指定了输出文件夹
print(f"Check '{
output_renamed_folder}' for recovered files.") # 提示检查输出文件夹
else:
print("Identified files (if any) were renamed in the original FOUND folder.") # 提示已识别文件在原文件夹中重命名
# --- 主程序入口示例 ---
# if __name__ == "__main__":
# # 配置日志 (假设 setup_main_logger 已经定义)
# # logger = setup_main_logger(log_file_path="chk_recovery.log")
# # logger.info("--- CHK File Recovery Script Started ---")
# # !! 修改为你U盘上实际的FOUND.xxx文件夹路径 !!
# # 例如: "E:\FOUND.000" (Windows) 或 "/mnt/usb_drive/FOUND.000" (Linux)
# target_found_folder = "E:\FOUND.000" # Windows示例路径
# # target_found_folder = "/media/user/USB_STICK/FOUND.000" # Linux示例路径
# # !! 修改为你希望保存恢复文件的输出文件夹路径 !!
# # 强烈建议使用与源不同的驱动器或分区
# output_recovery_path = "D:\Recovered_CHK_Files" # Windows示例路径
# # output_recovery_path = "/home/user/recovered_chks" # Linux示例路径
# if not os.path.exists(target_found_folder): # 如果目标文件夹不存在
# print(f"Error: The specified FOUND folder does not exist: {target_found_folder}") # 打印错误信息
# print("Please ensure CHKDSK has run and you have the correct path to a FOUND.xxx folder.") # 提示检查路径
# # logger.critical(f"Target FOUND folder not found: {target_found_folder}")
# else:
# process_found_folder(target_found_folder, output_recovery_path, copy_files=True) # 处理文件夹,复制文件
# # 或者,如果你想在原地重命名 (非常不推荐,除非你已备份FOUND文件夹):
# # process_found_folder(target_found_folder, output_renamed_folder=None)
# # logger.info("--- CHK File Recovery Script Finished ---")
代码解释与改进方向:
identify_chk_file_type(chk_file_path):
打开CHK文件,读取其头部的一小部分数据 (MAX_HEADER_READ_SIZE,这个大小由签名数据库中所有签名所需检查的最大偏移+长度决定,以避免不必要地读取整个大文件)。
遍历 FILE_SIGNATURES 数据库中的每个已知类型和对应的签名。
对于每个签名,检查CHK文件的头部数据在指定偏移量处是否与该签名字节序列匹配。
增加了对 sig_info.get('min_len', 0) 的检查,如果签名信息中定义了文件类型的最小合理长度,会进一步验证CHK文件大小是否满足,这有助于排除一些因签名过短而导致的误判(例如,一个只有10字节的CHK文件不太可能是一个完整的JPEG,即使它恰好以JPEG的SOI标记开头)。
如果所有已知签名都不匹配,它会调用 is_likely_text_file() 来尝试判断是否为文本文件。
is_likely_text_file(data_sample, ...):
一个简单的启发式函数,通过计算数据样本中可打印ASCII字符(加上TAB、LF、CR)的比例来判断是否为文本。
如果样本过小但提供了完整文件路径,它会尝试读取文件开头更多的数据(最多 sample_size 字节)来进行更准确的判断。
threshold_ratio (默认为0.85) 是一个可调参数,表示可打印字符至少要占多大比例才被认为是文本。
改进: 这个文本检测器非常基础。更高级的文本检测可以考虑:
Unicode字符集的检测(例如,检查UTF-8编码的有效性)。
常见非ASCII语言的字符频率分析。
排除包含过多连续控制字符或二进制特征的情况。
使用更复杂的统计模型或库(如 chardet 库可以检测字符编码,但它本身不直接判断是否为“文本内容”)。
process_found_folder(...):
遍历指定的 FOUND.xxx 文件夹中的所有文件。
对每个以 FILE 开头且以 .CHK 结尾的文件调用 identify_chk_file_type()。
如果识别成功:
根据 output_renamed_folder 和 copy_files 参数决定是复制文件到新位置并重命名,还是在原地重命名。强烈推荐使用复制到新位置的方式,以避免在原始 FOUND 文件夹中操作引入风险。
在构造新文件名时,会检查目标位置是否已存在同名文件,如果存在,则在文件名后添加序号(例如 FILE0001_1.jpg)以避免覆盖。
使用 shutil.copy2() 进行复制,它会尝试保留文件的元数据(如时间戳),而 os.rename() 用于移动/重命名。
打印处理过程的摘要信息。
日志: 代码中用 # logger.xxx(...) 注释的形式给出了一些日志记录的建议位置。在实际工具中,应使用之前讨论的 logging 模块来记录详细的操作、识别结果和错误。
MAX_HEADER_READ_SIZE: 通过 get_max_signature_length() 动态计算,确保我们至少读取了足够的数据来检查数据库中最长的或偏移最远的签名。
进一步的增强和细化策略:
更完善的文件签名数据库:
不断添加更多文件类型的签名。
为某些文件类型增加尾部签名 (Footer Signature) 的检查。例如,JPEG文件以 FF D9 结尾。PNG有 IEND 块。这可以作为双重验证,提高准确性。
对于基于容器的格式 (如 ZIP, OLE CFB, MP4/MOV),在识别出容器签名后,可以尝试解析容器内部的一些关键结构来进一步确认具体类型(例如,检查ZIP文件中是否有 word/document.xml 来确认是DOCX)。这需要更复杂的解析逻辑。
分阶段识别:
第一阶段:快速签名扫描: 使用明确且唯一的头部签名进行快速筛选。
第二阶段:结构化分析: 对于第一阶段未识别或识别为通用容器类型的文件,进行更深入的内容结构分析。例如,对于识别为ZIP的文件,尝试解压(到内存或临时位置)并检查其内部清单文件。
第三阶段:启发式文本/二进制分类: 对仍然未识别的文件,使用更通用的启发式方法区分文本和二进制,或者尝试基于熵、字节频率等进行粗略分类。
处理文件碎片:
当前的签名识别主要针对CHK文件包含文件头部的情况。如果CHK文件是文件的中间或尾部片段,则头部签名法会失效。
对于某些特定类型的文件(如JPEG包含多个可独立解码的扫描数据段,文本文件每行都是独立的),即使是片段也可能包含可识别的模式或部分有用的数据。
高级数据恢复技术中的文件雕刻 (File Carving) 会扫描整个数据区(而不仅仅是CHK文件)来查找文件头部和尾部,并尝试提取它们之间的所有数据。这对于恢复碎片化文件更有效,但实现起来也更复杂。当处理CHK文件时,如果一个CHK文件没有头部签名,但其内容符合某个文件类型的内部结构特征,也可以认为是一种片段恢复。
大小一致性检查:
某些文件格式在其头部包含文件总大小的信息(例如,很多RIFF格式如WAV/AVI,ZIP文件的中央目录等)。如果CHK文件识别为这类文件,可以尝试读取这个内部大小,并与CHK文件自身的实际大小进行比较。如果相差很大,可能表示CHK文件只是一个片段,或者识别有误。
用户交互和反馈:
对于难以自动识别的文件,可以提供一个界面或选项,让用户手动指定可能的类型或提供线索。
在重命名/复制后,可以提示用户尝试打开这些文件,并反馈恢复的质量。
并行处理:
如果 FOUND.xxx 文件夹中CHK文件数量非常多,可以考虑使用多进程或多线程来并行处理文件识别和复制操作,以加快速度。每个文件通常是独立处理的,适合并行化。但要注意Python的全局解释器锁 (GIL) 对CPU密集型纯Python代码并行效果的限制,对于IO密集型操作(如文件读写)或使用外部库(如某些C实现的解析器),并行效果会更好。
通过不断迭代和完善这些策略,我们可以构建一个功能越来越强大的CHK文件恢复工具。
第五部分:误删文件恢复实战
当用户在操作系统中“删除”一个文件时,例如将其移入回收站并清空,或者使用 Shift + Delete 直接删除,文件系统并不会立即将文件数据从磁盘上物理擦除。它主要做的是修改文件系统中的元数据,将文件占用的空间标记为可用,并将文件从目录列表中“移除”。只要这些原始数据所在的磁盘区域没有被新的数据写入覆盖,理论上它们就仍然可以被恢复。
误删文件恢复主要有两种策略,它们可以独立使用,也可以结合使用:
基于文件系统元数据的恢复 (Filesystem-level Recovery):
这种方法尝试利用文件系统中残留的关于已删除文件的信息(如目录条目、MFT记录、FAT表项等)来定位和恢复文件。
如果元数据损坏不严重,这种方法可以恢复文件名、目录结构、原始文件大小以及文件的簇分配信息(即使文件是碎片化的)。
对不同的文件系统(FAT32, NTFS, exFAT等),其元数据结构和删除标记方式不同,因此需要针对性的解析。
优点: 能较好地恢复文件的完整结构和元数据。
缺点: 严重依赖于文件系统元数据的完整性。如果元数据被严重破坏或部分覆盖,则效果不佳。
基于内容的数据雕刻 (Data Carving / File Carving):
这种方法不依赖于文件系统的元数据,而是直接扫描磁盘的原始数据区域(包括未分配的空间),查找已知文件类型的特定数据模式,主要是文件头部签名 (Header Signature) 和可能的尾部签名 (Footer Signature)。
当找到一个头部签名时,它会尝试提取从该头部开始到假定的文件结束(可能由尾部签名确定,或由文件大小、下一个文件头部等启发式规则确定)之间的数据块。
优点: 即使文件系统元数据完全丢失或损坏,只要文件数据本身还存在(至少是头部),就有可能恢复出文件。对于从未知文件系统或完全损坏的文件系统中恢复数据特别有用。
缺点:
通常只能恢复出文件的原始数据内容,而文件名、原始路径、时间戳等元数据会丢失(恢复出来的文件通常会被重新命名,如 file0001.jpg)。
对于碎片化 (Fragmented) 的文件,简单的数据雕刻(只查找连续的头部和尾部)效果很差,因为它无法将分散在磁盘不同位置的文件片段正确地拼接起来。高级雕刻技术会尝试处理碎片,但非常复杂。
容易产生误报(例如,在一个大文件中找到另一个小文件的签名)。
确定文件的结束位置可能很困难,特别是对于没有明确尾部签名的文件类型,或者当尾部被覆盖时。
5.1 基于文件系统元数据的恢复:以FAT32为例
我们先以相对简单的FAT32文件系统为例,探讨如何尝试恢复已删除文件。
回顾一下FAT32文件删除时发生的事情:
目录条目的第一个字节被修改为 0xE5。
该文件在FAT表中所占用的簇链通常会被清零(标记为空闲)。
恢复的关键步骤:
扫描目录结构: 遍历所有目录(包括根目录和子目录)中的目录条目。
查找已删除标记: 寻找第一个字节为 0xE5 的目录条目。
提取元数据: 从这些标记为已删除的目录条目中,我们仍然可以读取到:
原始文件名的剩余部分(从第二个字节开始)。
文件属性(例如,它是文件还是子目录)。
原始的文件大小。
原始文件数据开始的第一个簇的簇号。
时间戳等信息(可能部分有效)。
恢复簇链 (挑战点):
如果文件是非碎片化的: 并且我们知道它占用了多少个簇(可以通过文件大小和每簇字节数计算出来),那么我们可以从起始簇号开始,连续读取相应数量的簇来恢复文件。这是最理想的情况。
如果文件是碎片化的: 由于FAT链已被清除,我们无法直接知道下一个簇在哪里。这时,恢复会变得非常困难。一些可能的(但不完美的)策略包括:
假设连续性: 尝试按起始簇号连续读取,如果文件内容看起来合理(例如,对于JPEG,连续的图像数据),则继续。但如果遇到其他文件的签名或明显不相关的数据,则停止。
扫描空闲簇: 在FAT表中查找所有标记为空闲的簇,并尝试将它们与已恢复的文件片段进行“智能”拼接(例如,基于文件类型内部的连续性特征)。这非常复杂且容易出错。
依赖数据雕刻辅助: 如果能从目录条目知道文件名和大致类型,可以结合数据雕刻来寻找匹配的内容块。
数据提取: 根据找到的簇信息,从数据区读取相应的数据并保存为新文件。
5.1.1 解析FAT32引导扇区 (获取文件系统参数)
在进行任何FAT32恢复之前,我们必须首先解析其引导扇区(DBR – DOS Boot Record),以获取关键的文件系统参数。这些参数告诉我们如何解释FAT表、目录和数据区。
import struct # 导入struct模块,用于处理C结构体和二进制数据打包/解包
# (假设我们已经有了上一部分定义的 read_sectors_from_device 函数)
# (以及 open_windows_device / open_linux_device, close_windows_device / close_linux_device)
def parse_fat32_boot_sector(boot_sector_bytes): # 定义解析FAT32引导扇区的函数
"""
解析FAT32引导扇区的字节数据。
:param boot_sector_bytes: bytes, 引导扇区的内容 (通常是512字节)。
:return: dict, 包含解析出的参数的字典;如果不是有效的FAT32引导扇区则返回None。
"""
if len(boot_sector_bytes) < 90: # FAT32引导扇区至少需要这么多字节来包含所有关键字段
print("Error: Boot sector data too short for FAT32.") # 打印引导扇区数据过短错误
return None # 返回None
params = {
} # 初始化参数字典
# 引导扇区结构 (只解析我们需要的部分)
# '<' 表示小端字节序
# 'B' unsigned char (1 byte), 'H' unsigned short (2 bytes), 'I' unsigned int (4 bytes)
# '3s' 3-char string, '8s' 8-char string
try:
# --- BPB (BIOS Parameter Block) ---
params['jump_boot'] = boot_sector_bytes[0:3] # 跳转指令 (3字节)
params['oem_name'] = boot_sector_bytes[3:11].decode('ascii', errors='replace').strip() # OEM名称 (8字节)
params['bytes_per_sector'] = struct.unpack_from('<H', boot_sector_bytes, 11)[0] # 每扇区字节数 (2字节,偏移11)
params['sectors_per_cluster'] = struct.unpack_from('<B', boot_sector_bytes, 13)[0] # 每簇扇区数 (1字节,偏移13)
params['reserved_sector_count'] = struct.unpack_from('<H', boot_sector_bytes, 14)[0] # 保留扇区数 (2字节,偏移14)
params['num_fats'] = struct.unpack_from('<B', boot_sector_bytes, 16)[0] # FAT表数量 (1字节,偏移16)
params['root_entry_count'] = struct.unpack_from('<H', boot_sector_bytes, 17)[0] # 根目录条目数 (FAT12/16用,FAT32此值为0)
if params['root_entry_count'] != 0: # 如果根目录条目数不为0
print("Warning: Root entry count is non-zero, this might not be a FAT32 BPB (or a very old one).") # 打印警告
# 对于真正的FAT32,这个字段必须是0
params['total_sectors_16'] = struct.unpack_from('<H', boot_sector_bytes, 19)[0] # 16位总扇区数 (FAT12/16用,FAT32此值为0)
params['media_type'] = struct.unpack_from('<B', boot_sector_bytes, 21)[0] # 介质类型 (1字节,偏移21)
params['sectors_per_fat_16'] = struct.unpack_from('<H', boot_sector_bytes, 22)[0] # 16位每FAT扇区数 (FAT12/16用,FAT32此值为0)
# --- Extended BPB (FAT32部分从这里开始更重要) ---
params['sectors_per_track'] = struct.unpack_from('<H', boot_sector_bytes, 24)[0] # 每磁道扇区数 (2字节,偏移24)
params['num_heads'] = struct.unpack_from('<H', boot_sector_bytes, 26)[0] # 磁头数 (2字节,偏移26)
params['hidden_sector_count'] = struct.unpack_from('<I', boot_sector_bytes, 28)[0] # 隐藏扇区数 (4字节,偏移28)
params['total_sectors_32'] = struct.unpack_from('<I', boot_sector_bytes, 32)[0] # 32位总扇区数 (4字节,偏移32)
# --- FAT32 Extended BPB ---
params['sectors_per_fat_32'] = struct.unpack_from('<I', boot_sector_bytes, 36)[0] # 32位每FAT扇区数 (4字节,偏移36)
params['ext_flags'] = struct.unpack_from('<H', boot_sector_bytes, 40)[0] # 扩展标志 (2字节,偏移40)
params['fs_version'] = struct.unpack_from('<H', boot_sector_bytes, 42)[0] # 文件系统版本 (2字节,偏移42)
params['root_cluster'] = struct.unpack_from('<I', boot_sector_bytes, 44)[0] # 根目录起始簇号 (4字节,偏移44)
params['fsinfo_sector'] = struct.unpack_from('<H', boot_sector_bytes, 48)[0] # FSInfo扇区号 (2字节,偏移48)
params['backup_boot_sector'] = struct.unpack_from('<H', boot_sector_bytes, 50)[0] # 备份引导扇区号 (2字节,偏移50)
# params['reserved_0'] = boot_sector_bytes[52:64] # 保留 (12字节)
params['drive_number'] = struct.unpack_from('<B', boot_sector_bytes, 64)[0] # 驱动器号 (1字节,偏移64)
# params['reserved_1'] = boot_sector_bytes[65] # 保留 (1字节)
params['boot_signature_ext'] = struct.unpack_from('<B', boot_sector_bytes, 66)[0] # 扩展引导签名 (1字节,偏移66, 应为0x29)
params['volume_id'] = struct.unpack_from('<I', boot_sector_bytes, 67)[0] # 卷序列号 (4字节,偏移67)
params['volume_label'] = boot_sector_bytes[71:82].decode('ascii', errors='replace').strip() # 卷标 (11字节)
params['filesystem_type_str'] = boot_sector_bytes[82:90].decode('ascii', errors='replace').strip() # 文件系统类型字符串 (8字节)
# 检查签名 (0x55AA at offset 510)
if struct.unpack_from('<H', boot_sector_bytes, 510)[0] != 0xAA55: # 如果引导扇区签名不匹配
print("Error: Boot sector signature (0xAA55) not found at offset 510.") # 打印签名未找到错误
return None # 返回None
# 一些重要的计算值
if params['bytes_per_sector'] == 0: # 如果每扇区字节数为0 (无效)
print("Error: Bytes per sector is zero in boot sector.") # 打印错误信息
return None # 返回None
params['bytes_per_cluster'] = params['bytes_per_sector'] * params['sectors_per_cluster'] # 计算每簇字节数
if params['bytes_per_cluster'] == 0: # 如果每簇字节数为0 (无效)
print("Error: Bytes per cluster is zero (bytes_per_sector or sectors_per_cluster is zero).") # 打印错误信息
return None # 返回None
params['first_fat_sector'] = params['reserved_sector_count'] + params['hidden_sector_count'] # 计算第一个FAT表的起始扇区
# 注意:hidden_sector_count通常是相对于整个物理磁盘的,如果我们在操作一个分区,
# 那么我们读取的扇区0已经是分区的开始了,所以这里的hidden_sector_count (如果来自MBR分区表)
# 已经隐含在我们的扇区0中了。引导扇区中的hidden_sector_count (BPB_HiddSec) 是指
# 分区开始前的扇区数。当我们直接读取分区设备 (如 /dev/sdb1 或 \.E:) 时,
# 我们读取的扇区0就是分区的第一个扇区 (即DBR),此时DBR中的hidden_sector_count应该用于
# 计算相对于整个物理磁盘的绝对扇区号,但对于分区内的相对扇区号,可以直接用reserved_sector_count。
# 为了简化,我们假设我们读取的就是分区的DBR,且其hidden_sector_count是0,或者我们忽略它
# 因为我们操作的是分区内的相对扇区。
# 更准确地说,FAT表的开始扇区是相对于分区开始的。
params['first_fat_sector_relative'] = params['reserved_sector_count'] # 第一个FAT表相对于分区开始的扇区号
params['first_data_sector_relative'] = params['first_fat_sector_relative'] +
(params['num_fats'] * params['sectors_per_fat_32']) # 计算第一个数据扇区相对于分区开始的扇区号
# FAT12/16的根目录大小是固定的,FAT32的根目录在数据区
# root_dir_sectors = ((params['root_entry_count'] * 32) + (params['bytes_per_sector'] - 1)) // params['bytes_per_sector']
# if params['root_entry_count'] != 0: # FAT12/16
# params['first_data_sector_relative'] += root_dir_sectors
# 计算总数据簇数
total_sectors = params['total_sectors_32'] if params['total_sectors_32'] != 0 else params['total_sectors_16'] # 获取总扇区数
if total_sectors == 0: # 如果总扇区数为0
print("Error: Total sectors is zero in boot sector.") # 打印错误信息
return None # 返回None
data_sectors = total_sectors - (params['reserved_sector_count'] +
params['num_fats'] * params['sectors_per_fat_32']) # 计算数据区扇区数
# data_sectors = total_sectors - params['first_data_sector_relative'] # 另一种计算方式
if params['sectors_per_cluster'] == 0: # 如果每簇扇区数为0
print("Error: Sectors per cluster is zero.") # 打印错误信息
return None # 返回None
params['total_clusters'] = data_sectors // params['sectors_per_cluster'] # 计算总簇数
# 确定文件系统类型 (除了字符串,还可以通过簇数判断)
# FAT12: clusters < 4085
# FAT16: 4085 <= clusters < 65525
# FAT32: clusters >= 65525
# (这个判断有时会与 filesystem_type_str 结合使用)
if params['total_clusters'] < 4085: # 如果总簇数小于4085
params['actual_fs_type'] = "FAT12" # 实际文件系统类型为FAT12
elif params['total_clusters'] < 65525: # 如果总簇数小于65525
params['actual_fs_type'] = "FAT16" # 实际文件系统类型为FAT16
else: # 其他情况
params['actual_fs_type'] = "FAT32" # 实际文件系统类型为FAT32
if not params['filesystem_type_str'].upper().startswith("FAT32"): # 如果文件系统类型字符串不是以FAT32开头
print(f"Warning: Filesystem type string is '{
params['filesystem_type_str']}', but cluster count suggests '{
params['actual_fs_type']}'.") # 打印警告
# 优先相信簇数判断,但也要注意字符串。对于本例,我们主要关心FAT32。
if params['actual_fs_type'] != "FAT32": # 如果实际类型也不是FAT32
print("This does not appear to be a FAT32 volume based on cluster count.") # 打印不是FAT32卷的提示
# return None # 可以选择在这里返回None如果严格要求FAT32
# 检查sectors_per_fat_32是否为0,如果是0,则这不是一个有效的FAT32 BPB
if params['sectors_per_fat_32'] == 0:
print("Error: sectors_per_fat_32 (BPB_FATSz32) is 0, this is not a FAT32 volume.")
return None
except struct.error as e: # 捕获struct解包错误
print(f"Error parsing boot sector structure: {
e}") # 打印解析引导扇区结构错误
return None # 返回None
except UnicodeDecodeError as e: # 捕获Unicode解码错误
print(f"Error decoding string in boot sector (OEM, Label, or FSType): {
e}") # 打印解码字符串错误
# 可以继续,但某些字符串字段可能不正确
pass # 忽略错误,继续执行
return params # 返回解析出的参数字典
# --- 示例使用: 读取并解析引导扇区 ---
# if __name__ == "__main__":
# # ... (与之前相同的设备打开逻辑,获取 device_handle) ...
# # current_os_name = platform.system()
# # device_handle = None
# # if current_os_name == "Windows": device_handle = open_windows_device(r'\.E:')
# # elif current_os_name == "Linux": device_handle = open_linux_device("/dev/sdb1")
# if device_handle and (device_handle != INVALID_HANDLE_VALUE if current_os_name == "Windows" else True):
# print("
Reading Boot Sector (Sector 0)...") # 打印读取引导扇区信息
# boot_sector_content = read_sectors_from_device(device_handle, 0, 1, DEFAULT_SECTOR_SIZE) # 读取引导扇区内容
# if boot_sector_content: # 如果成功读取到引导扇区内容
# fat32_params = parse_fat32_boot_sector(boot_sector_content) # 解析引导扇区参数
# if fat32_params: # 如果成功解析出参数
# print("
--- Parsed FAT32 Boot Sector Parameters ---") # 打印解析出的FAT32引导扇区参数标题
# for key, value in fat32_params.items(): # 遍历参数字典
# if isinstance(value, bytes): # 如果值是字节串
# try:
# print(f" {key}: {value.decode('ascii', errors='replace')} (raw: {value})") # 打印解码后的字符串和原始字节串
# except: # 捕获异常
# print(f" {key}: (raw bytes) {value}") # 打印原始字节串
# elif isinstance(value, int) and key not in ['oem_name','volume_label','filesystem_type_str','actual_fs_type','jump_boot']: # 如果值是整数且不是特定字符串键
# print(f" {key}: {value} (0x{value:X})") # 打印十进制和十六进制值
# else: # 其他情况
# print(f" {key}: {value}") # 直接打印值
# # 使用这些参数进行后续操作,例如定位FAT表和根目录
# print(f"
Calculated first FAT relative sector: {fat32_params['first_fat_sector_relative']}") # 打印计算出的第一个FAT相对扇区
# print(f"Calculated first Data relative sector: {fat32_params['first_data_sector_relative']}") # 打印计算出的第一个数据相对扇区
# print(f"Calculated bytes per cluster: {fat32_params['bytes_per_cluster']}") # 打印计算出的每簇字节数
# else: # 如果解析失败
# print("Failed to parse the boot sector as FAT32.") # 打印解析引导扇区失败信息
# else: # 如果读取引导扇区内容失败
# print("Failed to read the boot sector from the device.") # 打印读取引导扇区失败信息
# # 关闭设备...
# # if current_os_name == "Windows": close_windows_device(device_handle)
# # elif current_os_name == "Linux": close_linux_device(device_handle)
# # else:
# # print("Device could not be opened.")
parse_fat32_boot_sector 函数解释:
使用 struct.unpack_from() 来从字节串中按指定的格式和偏移量解包数据。
它提取了FAT32引导扇区中的许多关键字段,并将它们存储在一个字典中。
进行了一些基本的有效性检查,如引导扇区签名 0xAA55。
计算了一些派生值,如 bytes_per_cluster (每簇字节数), first_fat_sector_relative (第一个FAT表相对于分区开始的扇区号), first_data_sector_relative (第一个数据扇区相对于分区开始的扇区号), 和 total_clusters (数据区总簇数)。
通过 total_clusters 粗略判断实际的文件系统类型,并与引导扇区中的 filesystem_type_str 进行比较。
重要: 对于 hidden_sector_count (BPB_HiddSec),其解释可能有些微妙。当直接操作一个分区设备(如 /dev/sdb1 或 \.E:)时,我们读取的第一个扇区就是该分区的DBR。DBR中的 BPB_HiddSec 字段指的是该分区之前有多少个隐藏扇区(即相对于整个物理磁盘的偏移)。在计算分区内部的FAT表和数据区起始扇区时,我们通常使用 BPB_RsvdSecCnt (保留扇区数) 作为DBR之后的第一个FAT表的起始偏移(相对于分区开始)。first_data_sector_relative 则是在此基础上加上所有FAT表占用的扇区数。
有了这些文件系统参数,我们就可以定位FAT表和数据区了。
5.1.2 读取和解析目录条目
目录(包括根目录和子目录)在FAT32中是由一系列32字节的目录条目组成的。我们需要读取这些条目,并特别已关注那些第一个字节为 0xE5 的条目。
# (接续之前的代码)
# 假设 fat32_bpb_params 字典已经通过 parse_fat32_boot_sector 获取
def get_cluster_sector(cluster_num, bpb_params): # 定义获取簇对应扇区的函数
"""计算给定簇号对应的第一个扇区号 (相对于分区开始)。"""
if cluster_num < 2: # 如果簇号小于2 (簇0和1是保留的)
# 这通常不应该发生,除非是根目录簇号 (但根目录的获取方式不同)
# 或者是一个无效的簇号
raise ValueError(f"Cluster number {
cluster_num} is invalid (must be >= 2).") # 抛出值错误
# 第一个数据簇是簇2
return bpb_params['first_data_sector_relative'] + (cluster_num - 2) * bpb_params['sectors_per_cluster'] # 计算扇区号
def read_cluster_chain(device_handle_or_file, start_cluster, bpb_params, fat_table_bytes=None): # 定义读取簇链的函数
"""
从给定的起始簇开始,读取整个簇链的数据。
如果提供了fat_table_bytes,则尝试从中读取FAT链 (这通常用于活动文件,而非已删除文件)。
对于已删除文件,我们通常不知道完整的链,所以这个函数可能需要调整
或者我们只读取起始簇(或假设连续性)。
此函数当前设计为读取一个目录或一个(可能非碎片化的)文件的数据。
:param device_handle_or_file: 设备句柄或文件对象。
:param start_cluster: int, 起始簇号。
:param bpb_params: dict, 解析后的引导扇区参数。
:param fat_table_bytes: bytes, (可选) 整个FAT表的内容。如果提供,则跟踪FAT链。
:return: bytes, 拼接起来的簇链数据;如果出错则返回None。
"""
all_data = bytearray() # 初始化一个可变字节数组用于存储所有数据
current_cluster = start_cluster # 设置当前簇为起始簇
visited_clusters = set() # 初始化一个集合用于存储已访问的簇,防止死循环
# FAT32条目是4字节的
fat_entry_size = 4 # FAT32每个条目的大小为4字节
max_clusters_to_read = bpb_params['total_clusters'] + 2 # 设定一个读取簇的最大上限,防止意外的超长链
while 0x00000002 <= current_cluster < 0x0FFFFFF0: # 当簇号在有效范围内 (不是空闲、坏簇、EOF的低位)
if current_cluster in visited_clusters: # 如果当前簇已访问过 (FAT链循环)
print(f"Error: Detected loop in FAT chain at cluster {
current_cluster}.") # 打印FAT链循环错误
# logger.error(f"FAT chain loop at cluster {current_cluster} starting from {start_cluster}.") # 记录错误日志
return None # 返回None
visited_clusters.add(current_cluster) # 将当前簇添加到已访问集合
if len(visited_clusters) > max_clusters_to_read : # 如果访问的簇数超过上限
print(f"Error: Exceeded maximum cluster read limit ({
max_clusters_to_read}) for chain starting at {
start_cluster}.") # 打印超出最大簇读取限制错误
# logger.error(f"Exceeded max cluster read for chain from {start_cluster}.") # 记录错误日志
return None # 返回None
try:
cluster_start_sector = get_cluster_sector(current_cluster, bpb_params) # 获取当前簇的起始扇区
# print(f"Reading cluster {current_cluster} at sector {cluster_start_sector}") # 打印正在读取的簇信息 (调试用)
cluster_data = read_sectors_from_device(
device_handle_or_file,
cluster_start_sector,
bpb_params['sectors_per_cluster'],
bpb_params['bytes_per_sector']
) # 读取当前簇的数据
if not cluster_data: # 如果读取簇数据失败
print(f"Error: Failed to read data for cluster {
current_cluster}.") # 打印读取簇数据失败错误
# logger.error(f"Failed to read cluster {current_cluster} data.") # 记录错误日志
return None # 返回None
all_data.extend(cluster_data) # 将读取到的簇数据追加到总数据中
except ValueError as ve: # 捕获值错误 (例如 get_cluster_sector 抛出的)
print(f"Error calculating sector for cluster {
current_cluster}: {
ve}") # 打印计算扇区错误
return None # 返回None
except Exception as e: # 捕获其他异常
print(f"Unexpected error reading cluster {
current_cluster}: {
e}") # 打印意外错误
return None # 返回None
if fat_table_bytes: # 如果提供了FAT表字节数据
# 从FAT表中查找下一个簇
fat_offset = current_cluster * fat_entry_size # 计算当前簇在FAT表中的偏移量
if fat_offset + fat_entry_size > len(fat_table_bytes): # 如果偏移量超出FAT表范围
print(f"Error: FAT offset {
fat_offset} for cluster {
current_cluster} is out of FAT table bounds ({
len(fat_table_bytes)}).") # 打印FAT偏移量越界错误
# logger.error(f"FAT offset out of bounds for cluster {current_cluster}.") # 记录错误日志
# 这种情况通常不应该发生,如果FAT表被正确读取且簇号有效
return bytes(all_data) # 返回已读取的数据,认为这是链的末尾(异常情况)
# FAT32条目是4字节,低28位是簇号
next_cluster_entry = struct.unpack_from('<I', fat_table_bytes, fat_offset)[0] # 解包获取下一个簇的条目值
next_cluster = next_cluster_entry & 0x0FFFFFFF # 取低28位作为下一个簇号
# print(f" FAT[{current_cluster}] = 0x{next_cluster_entry:08X} -> Next cluster: {next_cluster} (0x{next_cluster:08X})") # 打印FAT条目信息 (调试用)
current_cluster = next_cluster # 更新当前簇为下一个簇
else:
# 如果没有FAT表,我们无法跟踪链。对于目录恢复,我们通常会读取目录的所有簇。
# 对于已删除文件,我们可能只读取第一个簇,或者假设它是连续的(如果知道大小)。
# 这个函数当前设计为如果没FAT表就只读一个簇然后停止(因为不知道下一个在哪)。
# print("No FAT table provided, stopping after reading one cluster.") # 打印无FAT表提示 (调试用)
break # 跳出循环 (只读取一个簇)
# 循环结束,current_cluster现在的值决定了链的终止原因
if not (0x0FFFFFF8 <= current_cluster <= 0x0FFFFFFF or current_cluster == 0x0FFFFFF7 or current_cluster == 0x0000000): # 如果不是正常的EOF, 坏簇或空闲簇结束
if fat_table_bytes and current_cluster != 0: # 如果提供了FAT表且当前簇不为0 (表示非正常结束)
print(f"Warning: FAT chain ended with non-EOF/Bad/Free marker: 0x{
current_cluster:08X} (from start_cluster {
start_cluster})") # 打印FAT链非正常结束警告
# logger.warning(f"FAT chain from {start_cluster} ended unexpectedly with marker 0x{current_cluster:08X}") # 记录警告日志
return bytes(all_data) # 返回拼接后的字节数据
def parse_sfn_directory_entry(entry_bytes, encoding='cp437'): # 定义解析SFN目录条目的函数
"""
解析一个32字节的短文件名(SFN)目录条目。
:param entry_bytes: bytes, 32字节的目录条目数据。
:param encoding: str, 用于解码文件名和扩展名的编码。默认为'cp437' (IBM PC)。
对于中文环境,可能是 'gbk' 或 'cp936',但这取决于系统如何写入。
对于U盘,'cp437' 或 'oem' (通常是cp437的别名) 是常见的。
:return: dict, 包含解析出的条目信息的字典;如果条目无效或为空则返回None。
"""
if len(entry_bytes) != 32: # 如果条目字节长度不为32
return None # 返回None
entry = {
} # 初始化条目字典
entry['first_byte'] = entry_bytes[0] # 获取第一个字节
if entry['first_byte'] == 0x00: # 如果第一个字节是0x00 (条目未使用且之后也没有条目)
return None # 表示目录结束,返回None
# 解析短文件名 (8字节) 和扩展名 (3字节)
raw_sfn_name = entry_bytes[0:8] # 获取原始短文件名
raw_sfn_ext = entry_bytes[8:11] # 获取原始短扩展名
# 尝试用指定编码解码,替换无法解码的字符
# 并去除填充的空格
try:
sfn_name_str = raw_sfn_name.decode(encoding, errors='replace').strip() # 解码短文件名并去除空格
sfn_ext_str = raw_sfn_ext.decode(encoding, errors='replace').strip() # 解码短扩展名并去除空格
except UnicodeDecodeError: # 捕获Unicode解码错误
# 如果解码失败,尝试用更宽容的方式或保留原始字节
sfn_name_str = "".join(chr(b) if 32 <= b <= 126 else '?' for b in raw_sfn_name).strip() # 尝试ASCII转换
sfn_ext_str = "".join(chr(b) if 32 <= b <= 126 else '?' for b in raw_sfn_ext).strip() # 尝试ASCII转换
# logger.warning(f"UnicodeDecodeError for SFN entry, using lossy conversion. Raw SFN: {raw_sfn_name}, Ext: {raw_sfn_ext}") # 记录警告日志
entry['sfn_name'] = sfn_name_str # 保存短文件名
entry['sfn_extension'] = sfn_ext_str # 保存短扩展名
if sfn_ext_str: # 如果扩展名存在
entry['sfn_full_name'] = f"{
sfn_name_str}.{
sfn_ext_str}" # 构造完整短文件名
else: # 如果扩展名不存在
entry['sfn_full_name'] = sfn_name_str # 完整短文件名即为文件名
entry['attributes'] = entry_bytes[11] # 获取属性字节
entry['is_readonly'] = bool(entry['attributes'] & 0x01) # 判断是否只读
entry['is_hidden'] = bool(entry['attributes'] & 0x02) # 判断是否隐藏
entry['is_system'] = bool(entry['attributes'] & 0x04) # 判断是否系统文件
entry['is_volume_label'] = bool(entry['attributes'] & 0x08) # 判断是否卷标
entry['is_subdirectory'] = bool(entry['attributes'] & 0x10) # 判断是否子目录
entry['is_archive'] = bool(entry['attributes'] & 0x20) # 判断是否存档文件
entry['is_lfn_entry'] = (entry['attributes'] == 0x0F) # 判断是否LFN条目
if entry['is_lfn_entry']: # 如果是LFN条目
# LFN条目有不同的结构,这里我们主要已关注SFN,所以简单标记并返回
entry['type'] = 'lfn' # 类型为lfn
# 可以进一步解析LFN条目内容,例如序号、字符片段等,但对恢复已删除SFN不是直接必须
# LFN条目的第一个字节如果是0xE5,也表示它关联的SFN被删除了
return entry # 返回LFN条目信息
entry['type'] = 'sfn' # 类型为sfn
# 时间和日期解析 (这里只解包,具体转换成datetime对象较复杂,暂略)
# NT Reserved (Byte 12) - 忽略
# CreateTimeTenth (Byte 13) - 创建时间的毫秒数 (0-199)
# CreateTime (Bytes 14-15)
# CreateDate (Bytes 16-17)
# LastAccessDate (Bytes 18-19)
entry['nt_reserved'] = entry_bytes[12] # 获取NT保留字节
entry['create_time_ms_tenths'] = entry_bytes[13] # 获取创建时间的毫秒精度
entry['create_time_raw'] = struct.unpack_from('<H', entry_bytes, 14)[0] # 获取原始创建时间
entry['create_date_raw'] = struct.unpack_from('<H', entry_bytes, 16)[0] # 获取原始创建日期
entry['last_access_date_raw'] = struct.unpack_from('<H', entry_bytes, 18)[0] # 获取原始最后访问日期
# 起始簇号 (高16位在偏移20-21,低16位在偏移26-27)
high_cluster_word = struct.unpack_from('<H', entry_bytes, 20)[0] # 获取高16位簇号
low_cluster_word = struct.unpack_from('<H', entry_bytes, 26)[0] # 获取低16位簇号
entry['start_cluster'] = (high_cluster_word << 16) | low_cluster_word # 组合成32位起始簇号
entry['file_size_bytes'] = struct.unpack_from('<I', entry_bytes, 28)[0] # 获取文件大小 (字节)
# 最后修改时间和日期
entry['last_write_time_raw'] = struct.unpack_from('<H', entry_bytes, 22)[0] # 获取原始最后写入时间
entry['last_write_date_raw'] = struct.unpack_from('<H', entry_bytes, 24)[0] # 获取原始最后写入日期
# 对时间戳进行解码 (这是一个辅助函数,可以单独实现)
entry['last_write_datetime'] = decode_fat_datetime(entry['last_write_date_raw'], entry['last_write_time_raw']) # 解码最后写入日期时间
entry['create_datetime'] = decode_fat_datetime(entry['create_date_raw'], entry['create_time_raw'], entry['create_time_ms_tenths']) # 解码创建日期时间
entry['last_access_date'] = decode_fat_date(entry['last_access_date_raw']) # 解码最后访问日期
if entry['first_byte'] == 0xE5: # 如果第一个字节是0xE5 (已删除文件/目录)
entry['is_deleted'] = True # 标记为已删除
# 对于已删除的文件,其短文件名的第一个字符被替换为0xE5
# 有时可以尝试猜测原始字符,但通常我们只用剩余部分
# entry['sfn_name'] = '?' + entry['sfn_name'][1:] # (一种表示方式,不一定准确)
elif entry['first_byte'] == 0x05: # 如果第一个字节是0x05 (特殊情况,首字符实际是0xE5)
entry['is_deleted'] = False # 实际未删除 (但首字符是E5)
# sfn_name_bytes = b'xE5' + raw_sfn_name[1:]
# entry['sfn_name'] = sfn_name_bytes.decode(encoding, errors='replace').strip()
# 重新构造sfn_name和sfn_full_name
corrected_sfn_name_bytes = b'xE5' + raw_sfn_name[1:8] # 取原始字节,将首字节替换为0xE5
try:
entry['sfn_name'] = corrected_sfn_name_bytes.decode(encoding, errors='replace').strip() # 解码修正后的文件名
except UnicodeDecodeError: # 捕获解码错误
entry['sfn_name'] = "".join(chr(b) if 32 <= b <= 126 else '?' for b in corrected_sfn_name_bytes).strip() # 尝试ASCII转换
if entry['sfn_extension']: # 如果扩展名存在
entry['sfn_full_name'] = f"{
entry['sfn_name']}.{
entry['sfn_extension']}" # 构造完整短文件名
else: # 如果扩展名不存在
entry['sfn_full_name'] = entry['sfn_name'] # 完整短文件名即为文件名
else: # 其他情况
entry['is_deleted'] = False # 未删除
# 如果是卷标或LFN,则起始簇和大小无意义或不同
if entry['is_volume_label'] or entry['is_lfn_entry']: # 如果是卷标或LFN条目
entry['start_cluster'] = 0 # 起始簇设为0
entry['file_size_bytes'] = 0 # 文件大小设为0
return entry # 返回解析出的条目信息
# --- 辅助函数:解码FAT日期和时间 ---
def decode_fat_datetime(fat_date, fat_time, ms_tenths=0): # 定义解码FAT日期时间的函数
"""
将FAT格式的日期和时间戳转换为datetime对象。
:param fat_date: int, 16位的FAT日期。
:param fat_time: int, 16位的FAT时间。
:param ms_tenths: int, (可选) 创建时间的10毫秒精度 (0-199)。
:return: datetime.datetime对象;如果日期/时间无效则返回None。
"""
import datetime # 导入datetime模块
if fat_date == 0 and fat_time == 0: # 如果日期和时间都为0 (通常表示未使用)
return None # 返回None
try:
# FAT日期: bits 15-9: 年份 (相对于1980); bits 8-5: 月份 (1-12); bits 4-0: 日 (1-31)
year = ((fat_date >> 9) & 0x7F) + 1980 # 提取年份
month = (fat_date >> 5) & 0x0F # 提取月份
day = fat_date & 0x1F # 提取日期
# FAT时间: bits 15-11: 小时 (0-23); bits 10-5: 分钟 (0-59); bits 4-0: 秒/2 (0-29)
hour = (fat_time >> 11) & 0x1F # 提取小时
minute = (fat_time >> 5) & 0x3F # 提取分钟
second_div_2 = fat_time & 0x1F # 提取秒数 (除以2的值)
second = second_div_2 * 2 # 计算实际秒数
# 处理创建时间的毫秒精度 (ms_tenths 是 0-199, 对应 0-1990 毫秒)
# 秒数加上来自 create_time_ms_tenths 的额外秒数
# (ms_tenths / 100) 是秒, (ms_tenths % 100) * 10 是毫秒
# 不过FAT标准时间戳只有2秒精度,ms_tenths 主要用于创建时间
microsecond = 0 # 初始化微秒为0
if ms_tenths > 0 and second < 59 : # 如果有毫秒精度且秒数小于59 (避免进位到60)
# Windows CHKDSK 和其他工具可能会在创建时间使用这个字段。
# 标准的FAT时间只有2秒精度。这个ms_tenths让创建时间能精确到10ms。
# 如果ms_tenths >= 100, 表示加1秒。
if ms_tenths >= 100: # 如果ms_tenths大于等于100
second += 1 # 秒数加1
ms_tenths -= 100 # ms_tenths减去100
microsecond = ms_tenths * 10000 # 计算微秒 (10ms = 10000 us)
# 确保月份和日期在有效范围内,datetime会进行一些检查,但我们可以预检
if not (1 <= month <= 12): month = 1 # 如果月份无效,设为1 (或标记为错误)
if not (1 <= day <= 31): day = 1 # 如果日期无效,设为1 (或标记为错误)
return datetime.datetime(year, month, day, hour, minute, second, microsecond) # 返回datetime对象
except ValueError as e: # 捕获值错误 (例如日期不合法)
# print(f"Warning: Invalid FAT date/time value (Date:0x{fat_date:04X}, Time:0x{fat_time:04X}). {e}") # 打印无效日期/时间警告
return None # 返回None
except Exception as e: # 捕获其他异常
# print(f"Warning: Error decoding FAT datetime (Date:0x{fat_date:04X}, Time:0x{fat_time:04X}). {e}") # 打印解码日期/时间错误警告
return None # 返回None
def decode_fat_date(fat_date): # 定义解码FAT日期的函数 (仅日期)
"""将FAT格式的日期戳转换为date对象。"""
import datetime # 导入datetime模块
if fat_date == 0: return None # 如果日期为0,返回None
try:
year = ((fat_date >> 9) & 0x7F) + 1980 # 提取年份
month = (fat_date >> 5) & 0x0F # 提取月份
day = fat_date & 0x1F # 提取日期
if not (1 <= month <= 12): month = 1 # 如果月份无效,设为1
if not (1 <= day <= 31): day = 1 # 如果日期无效,设为1
return datetime.date(year, month, day) # 返回date对象
except: # 捕获任何异常
return None # 返回None
def parse_lfn_directory_entry(entry_bytes, current_lfn_parts): # 定义解析LFN目录条目的函数
"""
解析一个32字节的长文件名(LFN)目录条目,并将其字符片段追加到current_lfn_parts。
LFN条目以逆序出现,所以这个函数应该在遇到SFN之前被调用。
:param entry_bytes: bytes, 32字节的LFN条目数据。
:param current_lfn_parts: list, 用于累积LFN字符片段的列表 (从后往前)。
:return: bool, True如果这是一个有效的LFN条目,False否则。
"""
if len(entry_bytes) != 32 or entry_bytes[11] != 0x0F: # 如果长度不为32或属性不是0x0F
return False # 返回False (不是LFN条目)
lfn_entry = {
} # 初始化LFN条目字典
lfn_entry['order'] = entry_bytes[0] & 0x1F # 获取LFN序号 (低5位)
lfn_entry['is_last_lfn'] = bool(entry_bytes[0] & 0x40) # 判断是否最后一个LFN条目 (序号最高位置1)
# LFN字符存储在三个位置,每个字符是UTF-16LE (2字节)
# Chars 1-5: Offset 1-10 (10 bytes)
# Chars 6-11: Offset 14-25 (12 bytes)
# Chars 12-13: Offset 28-31 (4 bytes)
chars = bytearray() # 初始化字符字节数组
chars.extend(entry_bytes[1:11]) # 添加第1-5个字符的字节
chars.extend(entry_bytes[14:26])# 添加第6-11个字符的字节 (注意偏移量26是exclusive)
chars.extend(entry_bytes[28:32])# 添加第12-13个字符的字节
lfn_part_str = "" # 初始化LFN部分字符串
for i in range(0, len(chars), 2): # 每2个字节是一个UTF-16字符
char_bytes = chars[i:i+2] # 获取字符的字节
if char_bytes == b'xFFxFF' or char_bytes == b'x00x00': # 如果是填充符 (0xFFFF) 或结束符 (0x0000)
break # 停止解析此LFN条目的字符
try:
lfn_part_str += char_bytes.decode('utf-16-le') # 以utf-16-le解码字符并追加到字符串
except UnicodeDecodeError: # 捕获解码错误
lfn_part_str += '?' # 用问号替换无法解码的字符
# logger.warning(f"UnicodeDecodeError in LFN part: {char_bytes}") # 记录警告日志
# LFN条目是逆序存储的,我们收集时也按其原始顺序(即逆序片段)
# 最后再将所有片段反转并连接
current_lfn_parts.append({
'order': lfn_entry['order'], 'text': lfn_part_str, 'is_last': lfn_entry['is_last_lfn']}) # 将解析出的LFN片段信息添加到列表
return True # 返回True
def reconstruct_lfn_from_parts(lfn_parts_list): # 定义从LFN片段重构完整长文件名的函数
"""
从收集到的LFN片段列表中(期望是按逆序排列的)重构完整长文件名。
:param lfn_parts_list: list of dicts, 每个dict包含 'order', 'text', 'is_last'。
这个列表应该是从SFN往前回溯LFN条目时收集的,所以是逆序的。
:return: str, 重构的长文件名;如果片段无效或不连续则返回None。
"""
if not lfn_parts_list: # 如果列表为空
return None # 返回None
# LFN片段应该是按序号从高到低(因为我们是逆序读的)
# 我们需要按序号排序(从1到N),然后连接文本
lfn_parts_list.sort(key=lambda p: p['order']) # 按序号对LFN片段列表排序
# 验证序号是否连续且从1开始,以及最后一个标记是否正确
if not lfn_parts_list[0]['order'] == 1: # 如果第一个片段的序号不是1
# logger.warning(f"LFN reconstruction: First LFN part order is not 1 (is {lfn_parts_list[0]['order']}).") # 记录警告日志
return None # 返回None
is_last_lfn_found_correctly = False # 初始化是否正确找到最后一个LFN条目的标志
for i in range(len(lfn_parts_list)): # 遍历排序后的LFN片段列表
if i > 0 and lfn_parts_list[i]['order'] != lfn_parts_list[i-1]['order'] + 1: # 如果序号不连续
# logger.warning(f"LFN reconstruction: Discontinuity in LFN part orders ({lfn_parts_list[i-1]['order']} -> {lfn_parts_list[i]['order']}).") # 记录警告日志
return None # 返回None
if lfn_parts_list[i]['is_last'] and i != len(lfn_parts_list) - 1: # 如果标记为最后一个但不是列表中的最后一个
# logger.warning(f"LFN reconstruction: 'is_last' flag set on non-terminal LFN part (order {lfn_parts_list[i]['order']}).") # 记录警告日志
return None # 返回None
if lfn_parts_list[i]['is_last'] and i == len(lfn_parts_list) - 1: # 如果标记为最后一个且是列表中的最后一个
is_last_lfn_found_correctly = True # 设置标志为True
if not is_last_lfn_found_correctly and len(lfn_parts_list) > 0 : # 如果没有正确找到最后一个LFN条目且列表不为空
# 理论上,如果is_last_lfn_found_correctly为False,且lfn_parts_list不为空,
# 那么最后一个元素应该有 is_last = True。如果它没有,说明LFN序列不完整或损坏。
# 然而,有些实现可能不严格依赖这个is_last标志,只要序号连续即可。
# 为了更鲁棒,我们可能只检查连续性。
# 但标准规定最后一个LFN条目的序号(条目0的第一个字节)的bit 6 (0x40) 应被设置。
# 如果我们严格按此,则:
if not lfn_parts_list[-1]['is_last']:
# logger.warning("LFN reconstruction: The highest order LFN part is not marked as 'last'.")
# return None # 可以取消此严格检查
pass
full_lfn_string = "" # 初始化完整LFN字符串
for part in lfn_parts_list: # 遍历排序后的LFN片段列表
full_lfn_string += part['text'] # 拼接文本片段
# LFN字符串可能以NULL字符结尾,需要去除
return full_lfn_string.rstrip('x00') # 去除末尾的NULL字符并返回
def list_directory_entries_fat32(device_handle_or_file, dir_start_cluster, bpb_params, fat_table_bytes=None, encoding='cp437', include_lfn=True): # 定义列出FAT32目录条目的函数
"""
读取并解析一个FAT32目录(由起始簇号指定)中的所有目录条目。
:param device_handle_or_file: 设备句柄或文件对象。
:param dir_start_cluster: int, 目录的起始簇号。
:param bpb_params: dict, 解析后的引导扇区参数。
:param fat_table_bytes: bytes, (可选) 整个FAT表的内容,用于跟踪目录的簇链。
如果为None,则只读取第一个簇的目录(对于根目录可能足够,子目录需要链)。
对于已删除文件恢复,我们可能没有完整的FAT表。
:param encoding: str, 用于解码短文件名的编码。
:param include_lfn: bool, 是否尝试解析和关联长文件名。
:return: list of dicts, 每个字典代表一个解析后的SFN目录条目 (可能包含'lfn_full_name')。
"""
if dir_start_cluster == 0: # FAT32中,根目录的簇号在BPB中指定,不会是0。0通常是FAT12/16根目录区的特殊处理。
# 但对于FAT32,如果传入0,应该是一个错误。
# 不过,有些FAT32实现可能将根目录簇号为0视为特殊情况或错误。
# 实际根目录簇号在 bpb_params['root_cluster']
# logger.warning("list_directory_entries_fat32 called with dir_start_cluster=0. This is unusual for FAT32.") # 记录警告日志
# 我们应该使用 bpb_params['root_cluster'] 作为根目录的起始。
# 如果调用者确实想从簇0开始读(不推荐),那么它应该是一个无效簇。
# 这里假设调用者会传入正确的起始簇号。
if bpb_params.get('actual_fs_type') == 'FAT32' and bpb_params.get('root_cluster', 0) == 0:
print("Error: FAT32 root cluster is 0 in BPB, which is invalid.")
return []
elif bpb_params.get('actual_fs_type') != 'FAT32' and dir_start_cluster == 0:
# 对于FAT12/16,根目录有特殊处理,不通过簇链读取,这里不处理
print("Error: dir_start_cluster=0 is for FAT12/16 root, not handled here for FAT32 logic.")
return []
print(f"Listing directory starting at cluster: {
dir_start_cluster}") # 打印开始列出目录的簇信息
# logger.info(f"Parsing directory content for cluster chain starting at {dir_start_cluster}") # 记录日志
# 读取整个目录的簇链数据
# 对于已删除文件恢复,我们可能没有fat_table_bytes,此时read_cluster_chain可能只返回第一个簇
# 如果目录本身跨多个簇,且FAT链信息丢失,我们就只能获取第一个簇的内容。
# 但通常,目录的簇链在删除文件时不会被破坏(除非目录本身被删除或文件系统严重损坏)。
# 所以,如果能拿到FAT表,就应该传入。
dir_content_bytes = read_cluster_chain(device_handle_or_file, dir_start_cluster, bpb_params, fat_table_bytes) # 读取目录的簇链数据
if not dir_content_bytes: # 如果读取目录内容失败
print(f"Error: Failed to read content for directory starting at cluster {
dir_start_cluster}.") # 打印读取目录内容失败错误
# logger.error(f"Could not read directory content for cluster {dir_start_cluster}.") # 记录错误日志
return [] # 返回空列表
entries = [] # 初始化条目列表
lfn_buffer = [] # 初始化LFN片段缓冲区 (用于收集当前SFN之前的LFN条目)
entry_offset = 0 # 初始化条目偏移量
while entry_offset + 32 <= len(dir_content_bytes): # 当还有足够的字节解析一个32字节条目时循环
raw_entry = dir_content_bytes[entry_offset : entry_offset + 32] # 获取原始条目字节
first_byte = raw_entry[0] # 获取第一个字节
if first_byte == 0x00: # 如果第一个字节是0x00 (目录结束标记)
# print("Found end of directory marker (0x00).") # 打印找到目录结束标记信息 (调试用)
break # 跳出循环
attributes = raw_entry[11] # 获取属性字节
is_lfn = (attributes == 0x0F) # 判断是否LFN条目
if include_lfn and is_lfn: # 如果需要包含LFN且当前是LFN条目
# LFN条目,将其字符片段添加到缓冲区
# LFN条目是逆序存储的,所以我们添加到lfn_buffer的头部或尾部,最后统一处理
# 为了方便重构,我们按遇到的顺序(即从后往前)加入,最后反转
# parse_lfn_directory_entry 会处理单个LFN条目并追加到传入的列表
# current_lfn_parts 应该是从后往前收集的,所以新的LFN条目加在前面
# 或者,我们按遇到的顺序(从高序号到低序号)加入,然后排序/反转
# 为了简单,我们每次遇到LFN,都加到lfn_buffer里。
# 当遇到SFN时,再处理lfn_buffer。
temp_lfn_parts_for_this_entry = [] # 临时LFN片段列表
if parse_lfn_directory_entry(raw_entry, temp_lfn_parts_for_this_entry): # 解析LFN条目
lfn_buffer.extend(temp_lfn_parts_for_this_entry) # 将解析出的LFN片段添加到缓冲区
else: # 如果解析LFN条目失败 (不应发生,因为已检查属性)
# logger.warning(f"LFN entry at offset {entry_offset} failed basic LFN parse despite attr 0x0F.") # 记录警告日志
lfn_buffer.clear() # 清空LFN缓冲区,因为序列可能已破坏
else: # 不是LFN条目,那么它就是SFN条目 (或者是卷标等)
sfn_entry_data = parse_sfn_directory_entry(raw_entry, encoding) # 解析SFN目录条目
if sfn_entry_data: # 如果成功解析出SFN条目数据
if sfn_entry_data['is_lfn_entry']: # 如果解析SFN的函数返回它是个LFN (这不应该发生,除非逻辑错误)
# logger.error("parse_sfn_directory_entry unexpectedly returned an LFN type. Clearing LFN buffer.") # 记录错误日志
lfn_buffer.clear() # 清空LFN缓冲区
else: # 确认是SFN条目
if include_lfn and lfn_buffer: # 如果需要包含LFN且LFN缓冲区不为空
# 我们刚刚遇到了一个SFN,并且之前有一些LFN条目
# LFN条目在lfn_buffer中是按它们在磁盘上出现的顺序(从高序号到低序号)
# 我们需要反转这个顺序,因为reconstruct_lfn_from_parts期望的是从SFN往前追溯的顺序
# (即序号从1到N)
lfn_buffer.reverse() # 反转LFN缓冲区,使其序号从小到大
reconstructed_lfn = reconstruct_lfn_from_parts(lfn_buffer) # 重构长文件名
if reconstructed_lfn: # 如果成功重构
sfn_entry_data['lfn_full_name'] = reconstructed_lfn # 将重构的长文件名添加到SFN条目数据中
else: # 如果重构失败
# logger.warning(f"Failed to reconstruct LFN for SFN: {sfn_entry_data.get('sfn_full_name', 'N/A')}") # 记录警告日志
pass # 保留SFN名称
lfn_buffer.clear() # 清空LFN缓冲区,为下一个SFN做准备
# 不论LFN如何,都添加这个SFN条目(除非它是未使用的0xE5且不是我们要找的)
# 我们通常对所有条目都感兴趣,包括已删除的
# 对于已删除文件恢复,我们特别已关注 is_deleted == True 的条目
entries.append(sfn_entry_data) # 将SFN条目数据添加到列表
else: # 如果解析SFN条目失败 (例如,无效条目但不是0x00)
# logger.warning(f"Failed to parse potential SFN entry at offset {entry_offset}. First byte: {first_byte:02X}") # 记录警告日志
lfn_buffer.clear() # 清空LFN缓冲区,因为关联可能已断
entry_offset += 32 # 移动到下一个32字节条目的偏移量
return entries # 返回所有解析出的SFN条目列表
# --- 示例:列出根目录中已删除的文件 ---
# if __name__ == "__main__":
# # ... (打开设备,解析引导扇区得到 fat32_bpb_params) ...
# # current_os_name, device_handle, fat32_bpb_params 已获取
#
# if fat32_bpb_params and fat32_bpb_params.get('actual_fs_type') == "FAT32": # 如果参数有效且为FAT32
# root_dir_cluster = fat32_bpb_params['root_cluster'] # 获取根目录的起始簇号
# print(f"
--- Listing entries in Root Directory (starts at cluster {root_dir_cluster}) ---") # 打印列出根目录条目标题
#
# # 为了能跟踪目录的完整簇链(如果根目录跨多个簇),我们需要FAT表
# # 在实际恢复中,如果FAT表损坏或我们选择不读取它(因为可能被修改),
# # 这里的 list_directory_entries_fat32 可能只能列出第一个簇的目录项。
# # 暂时不传入fat_table_bytes,看看效果。
# # fat_table_content = None # (如果需要,可以先读取FAT表)
#
# # 获取FAT表 (如果需要且可行)
# fat_bytes_to_read = fat32_bpb_params['sectors_per_fat_32'] * fat32_bpb_params['bytes_per_sector'] # 计算FAT表总字节数
# fat_start_sector_for_read = fat32_bpb_params['first_fat_sector_relative'] # FAT表起始扇区
# print(f"Attempting to read FAT1 from sector {fat_start_sector_for_read}, size {fat_bytes_to_read} bytes...") # 打印尝试读取FAT1信息
# fat_table_content = read_sectors_from_device(device_handle, fat_start_sector_for_read,
# fat32_bpb_params['sectors_per_fat_32'],
# fat32_bpb_params['bytes_per_sector']) # 读取FAT表内容
# if not fat_table_content or len(fat_table_content) != fat_bytes_to_read: # 如果读取FAT表失败或长度不匹配
# print("Warning: Could not read the full FAT table. Directory listing might be incomplete if dir spans clusters.") # 打印读取FAT表失败警告
# # logger.warning("Failed to read full FAT table, directory chain traversal might be limited.") # 记录警告日志
# fat_table_content = None # 设为None,让 list_directory_entries_fat32 只读第一个簇
# else:
# print(f"Successfully read {len(fat_table_content)} bytes for FAT1.") # 打印成功读取FAT1字节数
#
# # 使用指定的编码,例如 'cp936' (简体中文GBK) 或 'cp437' (默认)
# # 这个编码非常重要,如果U盘上的文件名是非英文字符,错误的编码会导致乱码
# # 常见的U盘编码可能与操作系统的OEM代码页有关
# # 对于Windows中文版,可能是'gbk' (cp936的超集)
# # 对于英文版,可能是'cp437'或'cp1252'
# # 'utf-8' 通常不是FAT32短文件名的编码方式 (LFN使用UTF-16LE)
# filename_encoding = 'gbk' # 或者 'cp437' 或其他适合你U盘的编码
# print(f"Using filename encoding for SFN: {filename_encoding}") # 打印使用的文件名编码
#
# root_entries = list_directory_entries_fat32(device_handle, root_dir_cluster, fat32_bpb_params,
# fat_table_bytes=fat_table_content,
# encoding=filename_encoding,
# include_lfn=True) # 列出根目录条目
#
# if root_entries: # 如果获取到根目录条目
# print(f"
Found {len(root_entries)} SFN entries in the root directory.") # 打印找到的SFN条目数
# deleted_files_found = 0 # 初始化找到的已删除文件数
# for entry in root_entries: # 遍历每个条目
# entry_name_to_display = entry.get('lfn_full_name', entry.get('sfn_full_name', 'N/A')) # 获取要显示的文件名 (优先LFN)
# if entry.get('is_deleted'): # 如果条目标记为已删除
# deleted_files_found += 1 # 已删除文件数加1
# status = "DELETED" # 状态为已删除
# elif entry.get('is_volume_label'): # 如果是卷标
# status = "Volume Label" # 状态为卷标
# elif entry.get('is_subdirectory'): # 如果是子目录
# status = "Subdirectory" # 状态为子目录
# else: # 其他情况
# status = "File" # 状态为文件
#
# print(f" [{status}] Name: '{entry_name_to_display}'") # 打印状态和文件名
# print(f" SFN: '{entry.get('sfn_full_name', 'N/A')}', Attr: 0x{entry.get('attributes',0):02X}") # 打印SFN和属性
# if not entry.get('is_volume_label'): # 如果不是卷标
# print(f" Size: {entry.get('file_size_bytes', 0)} bytes, Start Cluster: {entry.get('start_cluster', 0)}") # 打印大小和起始簇
# # print(f" Created: {entry.get('create_datetime')}, Modified: {entry.get('last_write_datetime')}") # 打印创建和修改时间
#
# if entry.get('is_deleted') and not entry.get('is_subdirectory') and not entry.get('is_volume_label'): # 如果是已删除的文件 (非目录非卷标)
# # 这里可以尝试恢复这个文件
# print(f" ^-- This is a deleted file. Potential for recovery.") # 打印这是已删除文件,有恢复潜力
# # recovery_attempt(device_handle, entry, fat32_bpb_params, output_dir="recovered_fat32") # (下一步实现)
#
# if deleted_files_found == 0: # 如果未找到已删除文件
# print("No deleted file entries (starting with 0xE5) found in the root directory scan.") # 打印未找到已删除文件条目信息
# else: # 如果未获取到根目录条目
# print("No entries found or error reading root directory.") # 打印未找到条目或读取错误信息
#
# # 关闭设备
# # if current_os_name == "Windows": close_windows_device(device_handle)
# # elif current_os_name == "Linux": close_linux_device(device_handle)
# # else:
# # print("Failed to get valid FAT32 BPB parameters or open device.")
代码解释与要点:
get_cluster_sector(): 一个辅助函数,根据簇号和BPB参数计算该簇在分区中的起始扇区号。
read_cluster_chain():
这个函数用于读取从 start_cluster 开始的整个簇链的数据。
如果提供了 fat_table_bytes (整个FAT表的内容),它会尝试跟踪FAT链来读取所有属于该链的簇(直到遇到EOF或错误)。
对于已删除文件恢复,FAT链通常已被清除。 因此,如果不提供 fat_table_bytes(或者FAT表本身已损坏),这个函数当前只会读取 start_cluster 指向的第一个簇的数据。后续我们需要根据文件大小和“假设连续性”原则来调整它,或者只用它来读取目录内容(目录的FAT链通常在文件删除时不受影响)。
它包含一个简单的防死循环机制 (visited_clusters) 和最大读取簇数限制。
parse_sfn_directory_entry():
解析单个32字节的SFN(短文件名)目录条目。
提取文件名、扩展名、属性、起始簇号、文件大小、时间戳等。
处理 0xE5(已删除)和 0x00(目录结束)的第一个字节。
使用 decode_fat_datetime() 和 decode_fat_date() 辅助函数将FAT的时间戳转换为更易读的 datetime 对象。
正确组合高16位和低16位簇号得到32位起始簇号。
文件名编码 (encoding): 这是一个非常重要的参数。FAT的短文件名通常使用OEM代码页编码。对于英文系统可能是 cp437 或 cp1252,对于简体中文Windows系统通常是 gbk (或 cp936)。如果编码指定错误,非ASCII字符的文件名会显示为乱码。在实际工具中,可能需要允许用户指定或尝试自动检测编码。
decode_fat_datetime() 和 decode_fat_date(): 将FAT的16位日期和16位时间格式转换为Python的 datetime 或 date 对象。注意FAT时间的精度是2秒,创建时间可以通过 ms_tenths 字段达到10毫秒的精度。
parse_lfn_directory_entry() 和 reconstruct_lfn_from_parts():
用于处理长文件名 (LFN)。LFN条目的属性字节是 0x0F。
parse_lfn_directory_entry 解析单个LFN条目,提取其中的Unicode字符片段和序号。
reconstruct_lfn_from_parts 接收一个LFN片段列表(这些片段应该是从SFN条目“向上”逆序收集的),将它们按序号排序,然后拼接成完整的长文件名。它包含对LFN序列连续性和结束标记的一些基本检查。
list_directory_entries_fat32():
这是核心的目录遍历函数。它首先使用 read_cluster_chain() 读取指定目录(由 dir_start_cluster 定义)的所有簇的内容。
然后逐个32字节地解析这些内容。
它能够处理LFN条目和SFN条目的序列:当遇到LFN条目时,将其信息暂存到 lfn_buffer;当遇到紧随其后的SFN条目时,尝试用 lfn_buffer 中的信息重构长文件名,并将其关联到这个SFN条目。
返回一个包含所有解析出的SFN条目信息的列表,每个条目是一个字典,如果有关联的LFN,则字典中会包含 'lfn_full_name' 键。
主示例逻辑:
演示了如何打开设备、解析引导扇区、读取FAT表(可选但推荐用于完整目录遍历)、然后调用 list_directory_entries_fat32() 来列出根目录的内容。
它会打印出每个条目的信息,并特别标记出那些 is_deleted == True 的文件条目,这些就是我们尝试恢复的目标。
5.1.3 实现已删除文件的恢复逻辑 (FAT32)
现在我们有了已删除文件的目录条目信息(特别是起始簇号和文件大小),下一步是尝试读取这些文件的数据。
# (接续之前的代码)
# 假设 device_handle, fat32_bpb_params 已经可用
def recover_deleted_fat32_file_simple(device_handle_or_file, deleted_entry_info, bpb_params, output_directory): # 定义简单恢复已删除FAT32文件的函数
"""
尝试恢复一个已删除的FAT32文件 (主要基于起始簇和文件大小,假设连续性)。
:param device_handle_or_file: 设备句柄或文件对象。
:param deleted_entry_info: dict, 从 parse_sfn_directory_entry 返回的已删除文件条目信息。
:param bpb_params: dict, 解析后的引导扇区参数。
:param output_directory: str, 用于保存恢复文件的目录。
:return: bool, True 如果尝试恢复(可能部分)成功,False 否则。
"""
if not (deleted_entry_info and deleted_entry_info.get('is_deleted') and
not deleted_entry_info.get('is_subdirectory') and
not deleted_entry_info.get('is_volume_label')): # 如果条目信息无效、未删除、是子目录或卷标
# print("Debug: Entry is not a deleted file, skipping recovery.") # 打印调试信息
return False # 返回False
start_cluster = deleted_entry_info.get('start_cluster', 0) # 获取起始簇号
file_size_bytes = deleted_entry_info.get('file_size_bytes', 0) # 获取文件大小
if start_cluster < 2 or file_size_bytes == 0: # 如果起始簇无效或文件大小为0
# logger.info(f"Skipping recovery for '{deleted_entry_info.get('sfn_full_name')}': invalid start_cluster ({start_cluster}) or zero size.") # 记录日志
# print(f"Info: Cannot recover '{deleted_entry_info.get('sfn_full_name')}': invalid start cluster {start_cluster} or size {file_size_bytes}.") # 打印信息
return False # 返回False
# 构造恢复后的文件名 (优先使用LFN,其次SFN)
recovered_filename_base = deleted_entry_info.get('lfn_full_name', deleted_entry_info.get('sfn_full_name', 'recovered_file')) # 获取恢复文件名基础部分
# 清理文件名中的非法字符 (这是一个非常基础的清理)
recovered_filename_base = "".join(c for c in recovered_filename_base if c.isalnum() or c in (' ', '.', '_', '-')).strip() # 清理文件名
if not recovered_filename_base: recovered_filename_base = f"recovered_cluster_{
start_cluster}" # 如果基础文件名为空,则使用簇号命名
# 确保输出目录存在
if not os.path.exists(output_directory): # 如果输出目录不存在
try:
os.makedirs(output_directory) # 创建输出目录
except OSError as e: # 捕获OS错误
print(f"Error: Could not create output directory '{
output_directory}': {
e}") # 打印创建目录错误
# logger.error(f"Failed to create output dir for recovery '{output_directory}': {e}") # 记录错误日志
return False # 返回False
# 尝试构造唯一的输出文件名
output_file_path = os.path.join(output_directory, recovered_filename_base) # 构造输出文件路径
counter = 1 # 初始化计数器
temp_base, temp_ext = os.path.splitext(recovered_filename_base) # 分离基本名和扩展名
while os.path.exists(output_file_path): # 如果输出文件路径已存在
output_file_path = os.path.join(output_directory, f"{
temp_base}_{
counter}{
temp_ext}") # 构造新的输出文件路径 (添加序号)
counter += 1 # 计数器加1
print(f"Attempting to recover: '{
deleted_entry_info.get('sfn_full_name')}' (LFN: '{
deleted_entry_info.get('lfn_full_name')}')") # 打印尝试恢复信息
print(f" Start Cluster: {
start_cluster}, Size: {
file_size_bytes} bytes") # 打印起始簇和大小信息
print(f" Will save to: {
output_file_path}") # 打印将保存到的路径信息
# logger.info(f"Recovering deleted file: SFN='{deleted_entry_info['sfn_full_name']}', LFN='{deleted_entry_info.get('lfn_full_name')}', "
# f"StartCluster={start_cluster}, Size={file_size_bytes}, TargetPath='{output_file_path}'") # 记录日志
# 计算需要读取的总簇数
# bytes_per_cluster 一定不能为0,前面BPB解析时应该已检查
if bpb_params['bytes_per_cluster'] == 0: # 如果每簇字节数为0
print("Error: bytes_per_cluster is zero in BPB params. Cannot calculate num_clusters.") # 打印错误信息
return False # 返回False
num_clusters_to_read = (file_size_bytes + bpb_params['bytes_per_cluster'] - 1) // bpb_params['bytes_per_cluster'] # 计算需要读取的总簇数 (向上取整)
if num_clusters_to_read == 0 and file_size_bytes > 0 : num_clusters_to_read = 1 # 如果文件大小大于0但计算簇数为0,则至少读1簇
if num_clusters_to_read == 0 and file_size_bytes == 0: # 如果文件大小为0且簇数为0 (对于0字节文件)
try:
with open(output_file_path, 'wb') as outfile: # 以二进制写入模式打开输出文件
outfile.write(b'') # 写入空字节 (创建0字节文件)
print(f" Successfully created zero-byte file: {
output_file_path}") # 打印成功创建0字节文件信息
# logger.info(f"Created zero-byte file for '{deleted_entry_info['sfn_full_name']}' at {output_file_path}") # 记录日志
return True # 返回成功
except IOError as e: # 捕获IO错误
print(f" Error creating zero-byte file '{
output_file_path}': {
e}") # 打印创建0字节文件错误
# logger.error(f"IOError creating zero-byte file {output_file_path}: {e}") # 记录错误日志
return False # 返回失败
print(f" Calculated clusters to read: {
num_clusters_to_read}") # 打印计算出的需读取簇数信息
recovered_data = bytearray() # 初始化恢复数据的字节数组
# 简单恢复策略:假设文件是连续存储的 (这在FAT上通常不成立,除非文件很小或刚格式化后写入)
# 更高级的恢复会尝试扫描空闲簇或使用其他技术。
# 这里我们循环读取 num_clusters_to_read 个簇,从 start_cluster 开始。
# 注意:这并没有使用FAT表来跟踪链,因为对于已删除文件,链信息通常已丢失。
current_c = start_cluster # 初始化当前簇
clusters_successfully_read = 0 # 初始化成功读取的簇数
for i in range(num_clusters_to_read): # 循环需读取的簇数
# 检查簇号是否在合理范围内 (避免因错误的num_clusters_to_read导致读取过多)
if not (2 <= current_c < bpb_params['total_clusters'] + 2) : # 如果当前簇号不在有效范围内 (+2是因为簇号从2开始)
print(f" Warning: Calculated next cluster {
current_c} is out of valid range (2 to {
bpb_params['total_clusters']+1}). Stopping read.") # 打印簇号越界警告
# logger.warning(f"Recovery for '{deleted_entry_info['sfn_full_name']}': Next cluster {current_c} out of range. Read {clusters_successfully_read} clusters.") # 记录日志
break # 跳出循环
# print(f" Reading cluster {i+1}/{num_clusters_to_read} (Cluster No: {current_c})...") # 打印正在读取的簇信息 (调试用)
try:
cluster_sector = get_cluster_sector(current_c, bpb_params) # 获取当前簇的起始扇区
one_cluster_data = read_sectors_from_device(
device_handle_or_file,
cluster_sector,
bpb_params['sectors_per_cluster'],
bpb_params['bytes_per_sector']
) # 读取一个簇的数据
except ValueError as ve: # 捕获值错误
print(f" Error: Invalid cluster number {
current_c} encountered during recovery: {
ve}") # 打印无效簇号错误
# logger.error(f"Recovery for '{deleted_entry_info['sfn_full_name']}': Invalid cluster {current_c}. {ve}") # 记录错误日志
break # 跳出循环
if one_cluster_data: # 如果成功读取到簇数据
recovered_data.extend(one_cluster_data) # 将簇数据追加到恢复数据中
clusters_successfully_read += 1 # 成功读取的簇数加1
else: # 如果读取簇数据失败 (可能是坏道,或已到设备末尾)
print(f" Warning: Failed to read data for cluster {
current_c}. File may be incomplete.") # 打印读取簇数据失败警告
# logger.warning(f"Recovery for '{deleted_entry_info['sfn_full_name']}': Failed to read cluster {current_c}.") # 记录警告日志
# 我们可以选择停止,或者用0填充然后继续(如果想尽可能恢复大小)
# recovered_data.extend(b'x00' * bpb_params['bytes_per_cluster']) # (可选:用0填充)
break # 当前选择停止
current_c += 1 # 移动到下一个簇(假设连续)
if not recovered_data: # 如果没有恢复到任何数据
print(f" Failed: No data could be recovered for '{
deleted_entry_info.get('sfn_full_name')}' starting at cluster {
start_cluster}.") # 打印恢复失败信息
# logger.error(f"No data recovered for '{deleted_entry_info['sfn_full_name']}' from cluster {start_cluster}.") # 记录错误日志
return False # 返回失败
# 将恢复的数据写入文件 (只写入原始文件大小的部分)
try:
with open(output_file_path, 'wb') as outfile: # 以二进制写入模式打开输出文件
outfile.write(recovered_data[:file_size_bytes]) # 写入恢复的数据 (截取到原始文件大小)
print(f" Successfully recovered {
len(recovered_data[:file_size_bytes])} bytes to: {
output_file_path}") # 打印成功恢复信息
print(f" (Read {
clusters_successfully_read} clusters, totaling {
len(recovered_data)} bytes before truncation)") # 打印读取簇数和截断前总字节数
# logger.info(f"Recovered {len(recovered_data[:file_size_bytes])} bytes for '{deleted_entry_info['sfn_full_name']}' to {output_file_path}.") # 记录日志
return True # 返回成功
except IOError as e: # 捕获IO错误
print(f" Error writing recovered file '{
output_file_path}': {
e}") # 打印写入恢复文件错误
# logger.error(f"IOError writing recovered file {output_file_path}: {e}") # 记录错误日志
return False # 返回失败
# --- 将恢复逻辑集成到主示例 ---
# if __name__ == "__main__":
# # ... (之前的设备打开、BPB解析、FAT读取、目录列表示例代码) ...
# # 假设已经得到了 root_entries 列表
# # 并且 current_os_name, device_handle, fat32_bpb_params 可用
# # 并且 output_recovery_path 已定义
# if root_entries and fat32_bpb_params: # 如果有根目录条目和BPB参数
# print(f"
--- Attempting to Recover DELETED files from Root Directory ---") # 打印尝试恢复已删除文件标题
# recovered_count = 0 # 初始化已恢复文件计数
# for entry in root_entries: # 遍历每个条目
# if entry.get('is_deleted') and
# not entry.get('is_subdirectory') and
# not entry.get('is_volume_label') and
# entry.get('file_size_bytes', 0) > 0: # 如果是已删除、非目录、非卷标且大小大于0的文件
# # 尝试恢复
# if recover_deleted_fat32_file_simple(device_handle, entry, fat32_bpb_params, output_recovery_path): # 调用恢复函数
# recovered_count += 1 # 已恢复文件计数加1
# print("-" * 20) # 打印分隔线
# print(f"
Recovery attempt summary: {recovered_count} deleted file(s) processed for recovery.") # 打印恢复尝试摘要
# if recovered_count > 0: # 如果有恢复的文件
# print(f"Please check the output directory: {output_recovery_path}") # 提示检查输出目录
# # ... (关闭设备) ...
recover_deleted_fat32_file_simple 函数解释:
它接收已删除文件的目录条目信息、BPB参数和输出目录。
首先进行一些基本检查(例如,确保条目是已删除的文件,起始簇和文件大小有效)。
构造一个恢复后的文件名,并确保在输出目录中是唯一的。
计算理论上需要读取的簇的数量 (num_clusters_to_read),这是基于文件大小和每簇字节数向上取整得到的。
核心恢复逻辑 (简化版): 它通过一个循环,从 start_cluster 开始,假设文件数据是连续存储的,逐个读取 num_clusters_to_read 个簇。这是FAT32已删除文件恢复中最简单但也最不可靠的假设,因为文件很可能是碎片化的。
如果读取某个簇失败(例如遇到坏道),它会停止并可能只恢复了部分数据。
最后,将读取到的所有数据(最多截取到原始 file_size_bytes)写入到输出文件中。
重要局限性和改进方向:
碎片化问题: recover_deleted_fat32_file_simple 的最大问题是它假设文件是连续的。对于碎片化的已删除FAT文件,这种方法几乎肯定会失败或只恢复出文件的前几个片段,后续会读取到不相关的数据。
改进:
基于签名的连续性检查: 在读取每个簇后,可以尝试检查新加入的数据是否仍然符合某种文件类型的内部结构或签名。例如,如果正在恢复JPEG,新的簇数据是否看起来像JPEG的图像数据段?如果不是,则停止。
扫描空闲簇: 一个更高级的方法是,在获取了起始簇后,扫描整个FAT表(如果可用,但可能不准确)或整个数据区的空闲簇,尝试找到“最有可能”是该文件下一个片段的簇。这需要复杂的启发式算法,例如比较簇之间的数据相似性,或者利用文件类型已知的内部链接(如多媒体文件的box结构)。这非常困难。
用户辅助: 允许用户指定可能的最大文件大小,或者在恢复出多个可能的片段后,让用户尝试手动拼接。
FAT表的使用: 当前的简单恢复没有利用FAT表来重建簇链(因为假设它被清除了)。如果FAT表中的某些条目没有被完全清零,或者U盘使用了某种“延迟删除”机制,那么分析FAT表仍然可能找到一些线索。但这需要更复杂的FAT表解析逻辑,并且要能区分真正的空闲簇和可能属于已删除文件的“残留”链。
3… 递归恢复子目录: list_directory_entries_fat32 可以列出单个目录的内容。要恢复整个U盘的已删除文件,需要实现一个递归函数,从根目录开始,遍历所有子目录(包括已删除的子目录,如果其目录条目还在),并在每个目录中查找已删除的文件条目进行恢复。
* 恢复已删除的子目录本身也是一个挑战,因为它的簇链也可能丢失。
文件名编码: 正确处理短文件名的编码至关重要,否则恢复出来的非ASCII文件名会是乱码。需要提供选项或机制来选择正确的OEM代码页。
LFN恢复: 虽然我们解析了LFN,但在恢复已删除文件时,LFN条目(首字节也是0xE5)本身通常不直接包含数据位置信息。LFN主要用于恢复文件名。如果LFN条目本身被部分覆盖,长文件名可能也不完整。
与数据雕刻结合: 如果基于文件系统元数据的恢复(如此处讨论的)失败或效果不佳(特别是对于碎片化文件),数据雕刻(直接扫描原始磁盘数据寻找文件签名)可以作为一种补充或替代方法。我们将在后续部分讨论数据雕刻。
即使有这些局限性,上述代码也为我们提供了一个基于FAT32元数据进行已删除文件恢复的基础框架。通过理解其原理和不足,我们可以有针对性地进行改进。
第五部分:误删文件恢复实战
5.3 FAT32 文件系统中的碎片化文件恢复
在前面的讨论中,我们主要已关注的是连续存储的文件。然而,在实际使用中,文件系统(尤其是FAT32)会因为文件的创建、删除、修改而产生大量的碎片 (Fragmentation)。碎片化意味着一个文件的内容并非存储在连续的磁盘簇中,而是分散在磁盘的不同位置。这对文件恢复带来了显著的挑战。
5.3.1 理解文件碎片化的成因与影响
成因:
文件增长: 当一个已存在的文件需要增加大小时,如果其尾部没有足够的连续空闲空间,操作系统就必须在磁盘的其他位置分配新的簇来存储新增内容。
文件删除与创建: 删除文件会在磁盘上留下不连续的空闲空间块。后续创建新文件时,操作系统可能会利用这些分散的空闲块,导致新文件也是碎片化的。
磁盘空间不足: 当磁盘剩余空间较少且不连续时,新创建的文件几乎必然是碎片化的。
对恢复的影响:
无法依赖连续性: 简单地从起始簇开始连续读取数据块的方法不再适用。
需要跟踪FAT表: 恢复碎片化文件必须准确地解读文件分配表(FAT),因为FAT中记录了文件各个簇之间的链接关系。如果FAT条目本身损坏或被清除(例如在快速格式化后),恢复难度将急剧增加。
数据不完整风险: 如果文件对应的FAT链条中断或指向错误,恢复出来的文件可能会不完整或包含其他文件的数据。
5.3.2 FAT32中碎片化文件的存储方式回顾
再次回顾一下,FAT32通过文件分配表来管理文件的簇链。
目录条目中包含了文件的起始簇号。
FAT表中,每个条目对应一个簇:
如果条目值为 0x00000000,表示该簇空闲。
如果条目值为 0x0FFFFFFF (或 0xFFFFFFFF 等,取决于具体的FAT32实现,通常大于等于 0x0FFFFFF8),表示该簇是文件的最后一个簇。
如果条目值为其他非零值,它指向文件的下一个簇号。
因此,要恢复一个碎片化文件,我们需要:
从目录条目中找到文件的起始簇号。
以该起始簇号为索引,在FAT表中查找其对应的条目。
该条目的值就是文件的下一个簇号。
重复步骤2和3,直到遇到表示文件结束的FAT条目标记。
将依次读取到的所有簇的数据按顺序拼接起来,就得到了完整的文件内容。
5.3.3 恢复碎片化文件的策略与Python实现
策略核心:
定位FAT表: 首先需要准确找到FAT表在磁盘上的位置和大小。这通常可以从引导扇区(Boot Sector)或BPB(BIOS Parameter Block)中获取。
读取FAT表: 将FAT表的内容读入内存。由于FAT表可能很大,对于非常大的磁盘,可能需要分块读取或只读取活动FAT(通常有两个FAT表作为备份,FAT1是主用)。
解析目录条目: 扫描目录区域(通常从根目录开始,根目录的起始簇号也可以从BPB中获得),查找标记为已删除但仍有有效起始簇号的文件条目。
追踪簇链: 对于找到的已删除文件条目,获取其起始簇号。然后,利用内存中的FAT表数据,从该起始簇号开始,一步步追踪文件的簇链。
读取数据簇: 每追踪到一个有效的簇号,就从数据区读取该簇的内容。
重组文件: 将读取到的所有数据簇按正确的顺序拼接起来,保存为恢复后的文件。
Python 代码示例 (概念性增强)
假设我们已经有了上一节中读取扇区、解析BPB、解析目录条目的基础函数。现在我们重点已关注如何利用FAT表追踪簇链。
import struct
import os
# --- 假设已有的辅助函数 ---
def read_sector(disk_handle, sector_num, bytes_per_sector=512):
"""读取指定扇区的内容"""
# disk_handle: 通过 open(r'\.X:', 'rb') 打开的磁盘句柄 (X是U盘盘符)
# sector_num: 要读取的扇区号
# bytes_per_sector: 每扇区字节数
try:
disk_handle.seek(sector_num * bytes_per_sector) # 定位到扇区起始位置
sector_data = disk_handle.read(bytes_per_sector) # 读取一个扇区的数据
return sector_data # 返回扇区数据
except Exception as e:
print(f"读取扇区 {
sector_num} 失败: {
e}") # 打印错误信息
return None # 返回None表示失败
class FAT32BPB:
def __init__(self, boot_sector_data):
# ... (BPB解析代码,与之前类似,此处省略以突出核心逻辑) ...
self.bytes_per_sector = struct.unpack_from("<H", boot_sector_data, 11)[0] # 每扇区字节数
self.sectors_per_cluster = struct.unpack_from("<B", boot_sector_data, 13)[0] # 每簇扇区数
self.reserved_sectors = struct.unpack_from("<H", boot_sector_data, 14)[0] # 保留扇区数
self.num_fats = struct.unpack_from("<B", boot_sector_data, 16)[0] # FAT表数量
self.sectors_per_fat = struct.unpack_from("<I", boot_sector_data, 36)[0] # 每个FAT表占用的扇区数
self.root_dir_first_cluster = struct.unpack_from("<I", boot_sector_data, 44)[0] # 根目录簇号
# ... 其他BPB字段 ...
self.first_data_sector = self.reserved_sectors + (self.num_fats * self.sectors_per_fat) # 数据区起始扇区号
self.bytes_per_cluster = self.bytes_per_sector * self.sectors_per_cluster # 每簇字节数
def parse_directory_entry(entry_data):
"""解析32字节的目录条目"""
# entry_data: 32字节的目录条目数据
if entry_data[0] == 0xE5: # 第一个字节为0xE5表示已删除
filename_bytes = bytearray(entry_data[1:11]) # 获取文件名部分(不含第一个字节)
filename_bytes.insert(0, b'?') # 用 '?' 替换原来的0xE5,方便识别
filename = filename_bytes[0:8].strip().decode('ascii', errors='replace') # 解码文件名
extension = filename_bytes[8:11].strip().decode('ascii', errors='replace') # 解码扩展名
# 组合文件名和扩展名
if extension:
full_filename = f"{
filename}.{
extension}"
else:
full_filename = filename
attributes = entry_data[11] # 文件属性
is_directory = (attributes & 0x10) != 0 # 判断是否为目录
is_lfn_entry = (attributes & 0x0F) == 0x0F # 判断是否为长文件名条目
if is_lfn_entry: # 如果是长文件名条目,通常不直接处理其簇号
return {
"type": "lfn", "is_deleted": True}
first_cluster_high = struct.unpack_from("<H", entry_data, 20)[0] # 文件起始簇号高16位
first_cluster_low = struct.unpack_from("<H", entry_data, 26)[0] # 文件起始簇号低16位
first_cluster = (first_cluster_high << 16) + first_cluster_low # 组合成32位起始簇号
file_size = struct.unpack_from("<I", entry_data, 28)[0] # 文件大小
return {
# 返回解析结果
"type": "file_or_dir",
"is_deleted": True,
"filename": full_filename.replace('x00', ''), # 移除空字节并返回文件名
"attributes": attributes, # 返回文件属性
"is_directory": is_directory, # 返回是否为目录
"first_cluster": first_cluster, # 返回文件起始簇号
"file_size": file_size # 返回文件大小
}
elif entry_data[0] == 0x00: # 第一个字节为0x00表示目录条目结束
return {
"type": "end_of_directory"} # 返回目录结束标记
# ... (可以添加对正常文件条目的解析,但此处重点是已删除文件) ...
return None # 其他情况返回None
# --- 新增和修改的函数 ---
def load_fat_table(disk_handle, bpb):
"""加载FAT表到内存"""
# disk_handle: 磁盘句柄
# bpb: 解析后的BPB对象
fat_start_sector = bpb.reserved_sectors # FAT表的起始扇区号 (通常是第一个FAT)
fat_size_bytes = bpb.sectors_per_fat * bpb.bytes_per_sector # FAT表的总字节数
print(f"[*] 正在加载FAT表,起始扇区: {
fat_start_sector}, 大小: {
fat_size_bytes}字节") # 打印加载信息
fat_data = bytearray() # 初始化字节数组用于存储FAT数据
for i in range(bpb.sectors_per_fat): # 遍历FAT表占用的所有扇区
sector_data = read_sector(disk_handle, fat_start_sector + i, bpb.bytes_per_sector) # 读取当前扇区数据
if sector_data:
fat_data.extend(sector_data) # 将扇区数据追加到fat_data
else:
print(f"[!] 读取FAT表扇区 {
fat_start_sector + i} 失败") # 打印错误信息
return None # 读取失败返回None
# FAT表中的每个条目是4字节 (32位)
# 我们将其解析为一个整数列表,方便查找
num_fat_entries = len(fat_data) // 4 # 计算FAT条目数量
fat_table_values = [] # 初始化列表用于存储解析后的FAT条目值
for i in range(num_fat_entries): # 遍历所有FAT条目
# 每个FAT条目是4字节,小端序
entry_value = struct.unpack_from("<I", fat_data, i * 4)[0] # 解析FAT条目值
# FAT32中,簇号对应的FAT条目存储的是下一个簇号,但其高4位可能被保留或用作他用,需要屏蔽
# 实际应用中,很多实现直接使用这32位,但为了严谨,可以屏蔽高4位 (0x0FFFFFFF)
# 然而,对于末尾簇标记 (如0x0FFFFFFF, 0xFFFFFFFF),它们本身就利用了高位
# 关键是判断是否为特殊标记 (空闲, 坏簇, 文件结束)
fat_table_values.append(entry_value & 0x0FFFFFFF) # 将屏蔽高4位后的值添加到列表 (注意: 0x0FFFFFFF 标记本身也要保留)
# 更准确地说,应该只屏蔽保留位,但实际操作中,对0x0FFFFFFF以下的值进行屏蔽,
# 对于结束标记,它们通常大于0x0FFFFFF8,不需要屏蔽。
# 这里简化为对所有条目取低28位,但判断结束标记时要用原始值。
# 一个更通用的做法是直接使用32位值,然后根据范围判断其含义。
# 为了演示,我们先用简化的屏蔽,但需要注意其局限性。
# 修正:直接存储原始32位值,后续判断时再处理
fat_table_values_corrected = [] # 初始化修正后的FAT条目值列表
for i in range(num_fat_entries): # 再次遍历
entry_value = struct.unpack_from("<I", fat_data, i * 4)[0] # 解析FAT条目值
fat_table_values_corrected.append(entry_value) # 直接存储原始32位值
print(f"[*] FAT表加载完成,共 {
len(fat_table_values_corrected)} 个条目。") # 打印完成信息
return fat_table_values_corrected # 返回解析后的FAT条目列表
def get_cluster_chain(fat_table, start_cluster):
"""根据FAT表追踪文件的簇链"""
# fat_table: 加载到内存的FAT表 (整数列表)
# start_cluster: 文件的起始簇号
if start_cluster < 2 or start_cluster >= len(fat_table): # 检查起始簇号是否有效 (簇号从2开始)
print(f"[!] 无效的起始簇号: {
start_cluster}") # 打印错误信息
return [] # 返回空列表
cluster_chain = [] # 初始化簇链列表
current_cluster = start_cluster # 当前簇号初始化为起始簇号
# FAT32的结束标记通常是 >= 0x0FFFFFF8
# 0x00000000 是空闲簇
# 0x0FFFFFF7 是坏簇标记
# 其他值是下一个簇的簇号 (需要屏蔽高4位,如果它们被保留的话)
# 但实际上,簇号本身不会超过FAT表的最大索引,所以直接使用即可,
# 结束标记的值会非常大。
max_clusters_to_follow = len(fat_table) # 防止死循环,设置最大追踪簇数
count = 0 # 初始化计数器
while True:
cluster_chain.append(current_cluster) # 将当前簇号添加到簇链
count += 1 # 计数器加1
if count > max_clusters_to_follow: # 如果追踪簇数超过最大限制
print(f"[!] 追踪簇链时可能发生循环,已在簇 {
current_cluster} 中断。") # 打印警告信息
break # 跳出循环
if current_cluster >= len(fat_table): # 如果当前簇号超出FAT表范围
print(f"[!] 追踪簇链时发现无效簇号 {
current_cluster} (超出FAT表范围)。") # 打印错误信息
break # 跳出循环
next_cluster_raw_value = fat_table[current_cluster] # 从FAT表中获取当前簇对应的条目值
# print(f"DEBUG: 当前簇: {current_cluster}, FAT值: 0x{next_cluster_raw_value:08X}") # 调试信息
# 判断是否为文件结束标记
if next_cluster_raw_value >= 0x0FFFFFF8: # 如果是文件结束标记 (包括EOF, 坏簇等)
# EOF markers are typically 0x0FFFFFF8 through 0x0FFFFFFF
# 0x0FFFFFF7 is bad cluster
if next_cluster_raw_value == 0x0FFFFFF7: # 如果是坏簇标记
print(f"[!] 簇链在坏簇 {
current_cluster} 处中断。") # 打印警告信息
break # 跳出循环
elif next_cluster_raw_value == 0x00000000: # 如果是空闲簇 (意味着簇链意外中断)
print(f"[!] 簇链在空闲簇标记处 (簇 {
current_cluster} 指向 0) 中断。") # 打印警告信息
break # 跳出循环
else:
# 正常情况下,这个值就是下一个簇的簇号
# FAT32中簇号本身是28位的,存储在32位条目中,高4位保留。
# 但实际使用时,next_cluster_raw_value 就是下一个簇号。
current_cluster = next_cluster_raw_value # 更新当前簇号为下一个簇号
if current_cluster < 2: # 如果下一个簇号小于2 (无效簇号)
print(f"[!] 簇链中出现无效的下一个簇号: {
current_cluster} (来自簇 {
cluster_chain[-2] if len(cluster_chain)>1 else start_cluster})") # 打印错误信息
# 这种情况可能表示FAT表损坏或者文件确实在这里结束了(但没有正确标记EOF)
# 此时可以考虑停止,或者如果文件大小已知,可以尝试继续读取一定数量的簇
break # 跳出循环
return cluster_chain # 返回追踪到的簇链
def get_cluster_location(cluster_num, bpb):
"""计算簇在磁盘上的起始扇区号"""
# cluster_num: 簇号 (从2开始计数)
# bpb: 解析后的BPB对象
if cluster_num < 2: # 簇号必须大于等于2
raise ValueError("簇号必须大于等于2") # 抛出值错误
# 数据区的第一个簇是簇2
# 簇N的扇区号 = 数据区起始扇区号 + (N - 2) * 每簇扇区数
return bpb.first_data_sector + (cluster_num - 2) * bpb.sectors_per_cluster # 返回簇的起始扇区号
def recover_fragmented_file(disk_handle, bpb, fat_table, start_cluster, file_size, output_filename):
"""恢复碎片化文件"""
# disk_handle: 磁盘句柄
# bpb: BPB对象
# fat_table: FAT表
# start_cluster: 文件的起始簇号
# file_size: 文件大小 (用于确定读取多少数据)
# output_filename: 恢复后文件的保存路径
print(f"[*] 尝试恢复文件,起始簇: {
start_cluster}, 预期大小: {
file_size}字节, 输出到: {
output_filename}") # 打印恢复信息
cluster_chain = get_cluster_chain(fat_table, start_cluster) # 获取文件的簇链
if not cluster_chain: # 如果簇链为空
print(f"[!] 未能获取文件 {
output_filename} 的簇链。") # 打印错误信息
return False # 返回False表示失败
print(f"[*] 文件 {
output_filename} 的簇链: {
cluster_chain}") # 打印簇链信息
recovered_data = bytearray() # 初始化字节数组用于存储恢复的数据
bytes_to_recover = file_size # 初始化待恢复的字节数
for cluster_num in cluster_chain: # 遍历簇链中的每个簇号
if bytes_to_recover <= 0 and file_size > 0: # 如果已恢复足够字节且文件大小已知
# 文件大小为0的情况,需要读取整个簇链
break # 跳出循环
try:
cluster_start_sector = get_cluster_location(cluster_num, bpb) # 获取当前簇的起始扇区号
except ValueError as e:
print(f"[!] 计算簇 {
cluster_num} 位置失败: {
e}") # 打印错误信息
continue # 继续下一个簇
print(f" [-] 正在读取簇: {
cluster_num} (起始扇区: {
cluster_start_sector})") # 打印读取簇信息
cluster_data = bytearray() # 初始化字节数组用于存储当前簇的数据
for i in range(bpb.sectors_per_cluster): # 遍历簇内的每个扇区
sector_data = read_sector(disk_handle, cluster_start_sector + i, bpb.bytes_per_sector) # 读取扇区数据
if sector_data:
cluster_data.extend(sector_data) # 将扇区数据追加到簇数据
else:
print(f"[!] 读取簇 {
cluster_num} 中的扇区 {
cluster_start_sector + i} 失败。") # 打印错误信息
# 这里可以根据策略决定是跳过这个扇区还是中止整个簇的读取
break # 当前实现为中止簇读取
if not cluster_data: # 如果簇数据为空
print(f"[!] 未能读取簇 {
cluster_num} 的任何数据。") # 打印错误信息
continue # 继续下一个簇
# 根据剩余需要恢复的字节数,决定从当前簇中取多少数据
if file_size > 0: # 如果文件大小已知
bytes_from_this_cluster = min(len(cluster_data), bytes_to_recover) # 计算从当前簇获取的字节数
recovered_data.extend(cluster_data[:bytes_from_this_cluster]) # 将数据追加到恢复数据
bytes_to_recover -= bytes_from_this_cluster # 更新待恢复字节数
else: # 如果文件大小未知 (例如,从簇链直接恢复,不依赖目录条目中的文件大小)
recovered_data.extend(cluster_data) # 追加整个簇的数据
if not recovered_data: # 如果没有恢复到任何数据
print(f"[!] 未能从簇链中恢复任何数据: {
output_filename}") # 打印错误信息
return False # 返回False表示失败
# 实际恢复的文件大小可能与目录条目中的不完全一致,特别是簇链中断或错误时
actual_recovered_size = len(recovered_data) # 获取实际恢复的数据大小
if file_size > 0 and actual_recovered_size < file_size: # 如果实际恢复大小小于预期大小
print(f"[!] 警告: 文件 {
output_filename} 恢复不完整。预期大小: {
file_size}, 实际恢复: {
actual_recovered_size}") # 打印警告信息
elif actual_recovered_size == 0 and file_size > 0: # 如果预期有数据但未恢复到
print(f"[!] 错误: 文件 {
output_filename} 未恢复任何数据,但预期大小为 {
file_size}。") # 打印错误信息
return False
try:
with open(output_filename, 'wb') as f_out: # 以二进制写模式打开输出文件
f_out.write(recovered_data) # 写入恢复的数据
print(f"[+] 文件 {
output_filename} 成功恢复 ({
actual_recovered_size} 字节)。") # 打印成功信息
return True # 返回True表示成功
except IOError as e:
print(f"[!] 保存恢复的文件 {
output_filename} 失败: {
e}") # 打印IO错误信息
return False # 返回False表示失败
def scan_and_recover_deleted_files_fat32(disk_path, output_dir):
"""扫描FAT32分区并尝试恢复已删除的文件(包括碎片化的)"""
# disk_path: U盘路径,如 r'\.X:'
# output_dir: 恢复文件的输出目录
if not os.path.exists(output_dir): # 如果输出目录不存在
os.makedirs(output_dir) # 创建输出目录
disk_handle = None # 初始化磁盘句柄
try:
disk_handle = open(disk_path, 'rb') # 以二进制读模式打开磁盘
print(f"[*] 成功打开磁盘: {
disk_path}") # 打印成功信息
boot_sector_data = read_sector(disk_handle, 0) # 读取引导扇区数据
if not boot_sector_data: # 如果读取失败
print("[!] 无法读取引导扇区。") # 打印错误信息
return # 结束函数
bpb = FAT32BPB(boot_sector_data) # 解析BPB
print(f"[*] BPB解析完成: 每扇区字节数={
bpb.bytes_per_sector}, 每簇扇区数={
bpb.sectors_per_cluster}, 数据区起始扇区={
bpb.first_data_sector}") # 打印BPB信息
fat_table = load_fat_table(disk_handle, bpb) # 加载FAT表
if not fat_table: # 如果加载失败
print("[!] 无法加载FAT表。") # 打印错误信息
return # 结束函数
# 扫描根目录 (也可以扩展为递归扫描子目录)
# FAT32的根目录本身也是由簇链组成的,起始簇号在BPB中
current_dir_cluster = bpb.root_dir_first_cluster # 获取根目录起始簇号
# 为了简单起见,我们先假设根目录不至于非常非常大,可以一次性读取其所有簇
# 实际应用中,目录本身也可能跨多个簇,需要像文件一样追踪其簇链
dir_clusters = get_cluster_chain(fat_table, current_dir_cluster) # 获取目录的簇链
if not dir_clusters: # 如果目录簇链为空
print(f"[!] 无法获取根目录 (簇 {
current_dir_cluster}) 的簇链。") # 打印错误信息
return # 结束函数
print(f"[*] 根目录簇链: {
dir_clusters}") # 打印根目录簇链
dir_entry_offset = 0 # 初始化目录条目偏移量
lfn_buffer = [] # 初始化长文件名缓冲区
for cluster_num in dir_clusters: # 遍历目录簇链中的每个簇
try:
cluster_start_sector = get_cluster_location(cluster_num, bpb) # 获取簇的起始扇区号
except ValueError as e:
print(f"[!] 获取目录簇 {
cluster_num} 位置失败: {
e}") # 打印错误信息
continue # 继续下一个簇
print(f" [*] 正在扫描目录簇: {
cluster_num} (起始扇区: {
cluster_start_sector})") # 打印扫描信息
for i in range(bpb.sectors_per_cluster): # 遍历簇内的每个扇区
sector_data = read_sector(disk_handle, cluster_start_sector + i, bpb.bytes_per_sector) # 读取扇区数据
if not sector_data: # 如果读取失败
print(f"[!] 读取目录簇 {
cluster_num} 中的扇区 {
cluster_start_sector + i} 失败。") # 打印错误信息
break # 跳出内层循环,处理下一个簇
for j in range(0, bpb.bytes_per_sector, 32): # 每32字节一个目录条目
entry_data = sector_data[j : j + 32] # 获取目录条目数据
parsed_entry = parse_directory_entry(entry_data) # 解析目录条目
if parsed_entry:
if parsed_entry["type"] == "end_of_directory": # 如果是目录结束标记
print("[*] 到达目录末尾。") # 打印信息
return # 结束扫描
# 处理长文件名 (LFN) 条目
# LFN条目倒序存储,需要收集起来再组合
# 简单示例中,我们仅已关注短文件名条目来恢复
# 此处可以添加LFN处理逻辑
if parsed_entry["type"] == "file_or_dir" and parsed_entry["is_deleted"]: # 如果是已删除的文件或目录条目
# 我们主要已关注恢复文件
if not parsed_entry["is_directory"] and parsed_entry["first_cluster"] >= 2 and parsed_entry["file_size"] > 0 : # 如果是文件,且起始簇有效,且文件大小大于0
filename = parsed_entry["filename"] # 获取文件名
start_cluster = parsed_entry["first_cluster"] # 获取起始簇号
file_size = parsed_entry["file_size"] # 获取文件大小
# 为了避免文件名冲突,可以加上唯一标识
recovered_filename = f"recovered_{
dir_entry_offset}_{
filename.replace('?', '_').replace('/', '_').replace(':', '_')}" # 构建恢复文件名
output_path = os.path.join(output_dir, recovered_filename) # 构建输出路径
print(f"[*] 发现已删除文件: {
filename}, 起始簇: {
start_cluster}, 大小: {
file_size}") # 打印发现信息
recover_fragmented_file(disk_handle, bpb, fat_table, start_cluster, file_size, output_path) # 尝试恢复文件
elif parsed_entry["is_directory"]: # 如果是目录
# 此处可以递归进入子目录进行扫描
print(f"[*] 发现已删除的目录条目: {
parsed_entry['filename']} (起始簇: {
parsed_entry['first_cluster']}) - 暂不递归处理") # 打印信息
dir_entry_offset += 1 # 目录条目偏移量加1
except FileNotFoundError:
print(f"[!] 磁盘路径 {
disk_path} 未找到。请确保路径正确,并在Windows下以管理员权限运行脚本(如果需要直接访问物理驱动器)。") # 打印文件未找到错误
except PermissionError:
print(f"[!] 权限不足,无法打开磁盘 {
disk_path}。请尝试以管理员权限运行。") # 打印权限错误
except Exception as e:
print(f"[!] 发生未知错误: {
e}") # 打印未知错误
import traceback # 导入traceback模块
traceback.print_exc() # 打印详细的堆栈跟踪信息
finally:
if disk_handle: # 如果磁盘句柄存在
disk_handle.close() # 关闭磁盘句柄
print(f"[*] 已关闭磁盘: {
disk_path}") # 打印关闭信息
# --- 使用示例 ---
if __name__ == "__main__":
# !! 警告: 直接操作磁盘非常危险,请务必小心 !!
# !! 强烈建议在虚拟机中的U盘镜像上进行测试 !!
# 例如,如果你的U盘在Windows下是 E: 盘
# disk_to_scan = r'\.E:'
# 对于Linux,可能是 '/dev/sdb' 等,但Python的open()直接访问块设备可能需要特定权限或方法
# 为了安全和演示,以下路径仅为示例,你需要替换为实际的U盘设备路径或镜像文件路径
# 假设我们有一个U盘的镜像文件 'usb_image.dd'
# disk_to_scan = 'usb_image.dd' # 如果是镜像文件,可以直接打开
# 在实际物理U盘上操作 (Windows示例,需要管理员权限):
# disk_to_scan = r'\.X:' # 将 X 替换为你的U盘盘符
# output_recovery_dir = "D:/fat32_recovered_files_fragmented"
# print("警告:此脚本将尝试直接读取磁盘扇区。请确保目标磁盘路径正确,并理解潜在风险。") # 打印警告
# print("强烈建议先在磁盘镜像上测试。") # 打印建议
# proceed = input("是否继续? (yes/no): ") # 提示用户确认
# if proceed.lower() == 'yes': # 如果用户确认
# scan_and_recover_deleted_files_fat32(disk_to_scan, output_recovery_dir) # 执行扫描和恢复
# else:
# print("操作已取消。") # 打印取消信息
print("代码示例已提供。请在理解代码和风险后,替换disk_to_scan和output_recovery_dir路径,并在安全的环境下运行。") # 打印最终提示
print("直接操作物理磁盘需要管理员权限,并且有损坏数据的风险。") # 再次警告
代码解释与增强说明:
load_fat_table(disk_handle, bpb):
根据BPB信息计算FAT表的起始扇区和大小。
逐扇区读取FAT表数据,并将其加载到内存中。
将原始的字节数据解析为一个整数列表 fat_table_values_corrected,其中每个整数代表FAT表中的一个32位条目。我们直接存储原始值,因为结束标记等特殊值需要完整的32位来判断。
get_cluster_chain(fat_table, start_cluster):
核心函数,用于根据FAT表追踪一个文件的簇链。
输入内存中的FAT表和文件的起始簇号。
从起始簇开始,查询 fat_table 中对应索引的值,该值即为下一个簇的簇号。
持续追踪,直到遇到文件结束标记(通常 >= 0x0FFFFFF8)。
增加了对无效簇号、超出FAT表范围、以及潜在死循环的检查,以增强鲁棒性。
正确处理了FAT32中各种特殊标记(空闲簇、坏簇、文件结束符)。
get_cluster_location(cluster_num, bpb):
一个辅助函数,根据簇号和BPB信息计算该簇在磁盘上的绝对起始扇区号。
记住簇号从2开始计数。
recover_fragmented_file(disk_handle, bpb, fat_table, start_cluster, file_size, output_filename):
整合了簇链追踪和数据读取。
首先调用 get_cluster_chain 获取文件的所有簇号。
然后遍历簇链中的每一个簇号:
使用 get_cluster_location 找到簇的物理位置。
逐扇区读取该簇的所有数据。
将读取到的数据追加到 recovered_data。
根据目录条目中记录的 file_size 来决定读取多少数据。如果 file_size 为0或者恢复的数据不足,会打印警告。
最后将 recovered_data 写入到指定的输出文件。
scan_and_recover_deleted_files_fat32(disk_path, output_dir):
顶层函数,编排整个扫描和恢复流程。
打开磁盘,读取并解析BPB。
加载FAT表。
扫描目录:
从BPB获取根目录的起始簇号。
调用 get_cluster_chain 获取根目录本身占用的所有簇(因为根目录也可能跨簇)。
遍历目录的每个簇,再遍历簇中的每个扇区,最后按32字节解析每个目录条目。
当 parse_directory_entry 返回一个已删除的文件条目 (is_deleted 为 True,且不是目录,且起始簇号和文件大小有效) 时:
调用 recover_fragmented_file 尝试恢复。
包含了基本的错误处理(如磁盘未找到、权限不足)。
处理碎片化恢复的挑战与进一步完善方向:
FAT表损坏: 如果FAT表本身有损坏(例如,某个簇链指针错误,或指向了已分配给其他文件的簇),恢复出来的文件可能会包含不正确的数据或者提前结束。检测和处理FAT表一致性问题是一个高级主题。
文件大小未知或不准确: 目录条目中的文件大小信息可能因为删除操作或后续磁盘活动而被破坏。如果文件大小未知,我们可以尝试读取整个簇链,但这可能导致恢复出的文件末尾包含垃圾数据(如果簇链的末尾标记丢失或不正确)。如果文件大小比实际簇链所能提供的数据要大,则文件恢复不完整。
长文件名 (LFN): 上述代码主要基于短文件名(8.3格式)进行恢复。FAT32使用额外的LFN目录条目来存储长文件名。一个完整的文件恢复方案需要正确解析和重组LFN条目,以恢复原始的长文件名。LFN条目具有 0x0F 属性,并且以倒序存储在短文件名条目之前。
深度扫描与启发式方法: 当FAT表信息严重缺失或文件系统结构损坏时(例如快速格式化后,FAT表可能被部分或完全清零),单纯依赖FAT表追踪簇链将不再有效。这时就需要更高级的技术,如:
数据雕刻 (Data Carving): 忽略文件系统元数据,直接扫描磁盘寻找特定文件类型的文件头(header)和文件尾(footer)签名。我们将在后续讨论。
启发式簇链重组: 即使FAT链断裂,有时也可以通过分析簇与簇之间内容的关联性(例如,文本文件的连续性,图片或视频数据的结构特征)来尝试猜测和重组簇链。这非常复杂且成功率不一。
性能: 对于非常大的磁盘,一次性将整个FAT表加载到内存可能不可行或效率低下。可以考虑分块加载FAT,或者只加载活动FAT。读取磁盘也应该有更优化的缓冲策略。
用户界面和交互: 实际的数据恢复工具通常提供GUI,允许用户选择分区、预览可恢复文件、选择恢复选项等。
5.4 完善FAT32恢复代码:递归目录扫描与更智能的簇读取
5.4.1 实现递归目录扫描
到目前为止,我们的 scan_and_recover_deleted_files_fat32 函数只扫描了根目录。一个完整的文件系统包含多层目录结构。为了能够恢复子目录中的文件,我们需要实现递归扫描。
思路:
修改 scan_and_recover_deleted_files_fat32,使其接受一个起始簇号作为参数,代表要扫描的目录。
当在扫描目录时遇到一个目录条目(非已删除,或者我们想尝试恢复已删除的目录结构),并且该条目表示一个子目录(通过其属性字节判断),则递归调用扫描函数,传入该子目录的起始簇号。
修改后的 scan_and_recover_deleted_files_fat32 (概念性):
# ... (前面的函数定义不变) ...
def scan_directory_recursive(disk_handle, bpb, fat_table, dir_start_cluster, current_path, output_dir_base):
"""
递归扫描目录并尝试恢复文件。
dir_start_cluster: 当前要扫描的目录的起始簇号。
current_path: 当前目录在恢复结构中的相对路径 (用于组织输出)。
output_dir_base: 恢复文件的根输出目录。
"""
print(f"[*] 正在扫描目录,起始簇: {
dir_start_cluster}, 路径: '{
current_path}'") # 打印扫描信息
dir_clusters = get_cluster_chain(fat_table, dir_start_cluster) # 获取目录的簇链
if not dir_clusters: # 如果目录簇链为空
print(f"[!] 无法获取目录 (簇 {
dir_start_cluster}) 的簇链。") # 打印错误信息
return # 结束函数
dir_entry_offset = 0 # 初始化目录条目偏移量
lfn_parts = [] # 用于收集长文件名片段
for cluster_num in dir_clusters: # 遍历目录簇链中的每个簇
try:
cluster_start_sector = get_cluster_location(cluster_num, bpb) # 获取簇的起始扇区号
except ValueError as e:
print(f"[!] 获取目录簇 {
cluster_num} 位置失败: {
e}") # 打印错误信息
continue # 继续下一个簇
for i in range(bpb.sectors_per_cluster): # 遍历簇内的每个扇区
sector_data = read_sector(disk_handle, cluster_start_sector + i, bpb.bytes_per_sector) # 读取扇区数据
if not sector_data: # 如果读取失败
print(f"[!] 读取目录簇 {
cluster_num} 中的扇区 {
cluster_start_sector + i} 失败。") # 打印错误信息
break # 跳出内层循环
for j in range(0, bpb.bytes_per_sector, 32): # 每32字节一个目录条目
entry_data = sector_data[j : j + 32] # 获取目录条目数据
# 初步检查是否为LFN条目或有效短文件名条目
attribute = entry_data[11] # 获取属性字节
first_char_filename = entry_data[0] # 获取文件名的第一个字符
if first_char_filename == 0x00: # 目录条目列表结束
print(f"[*] 到达目录 '{
current_path}' (簇 {
dir_start_cluster}) 的末尾。") # 打印信息
return # 结束当前目录扫描
is_lfn = (attribute & 0x0F) == 0x0F # 判断是否为LFN条目
actual_filename_to_use = "" # 初始化实际使用的文件名
if is_lfn and first_char_filename != 0xE5: # 如果是LFN条目且未被删除标记 (0xE5)
# 这是解析LFN的核心部分
# LFN 条目是倒序存储的,最后一个LFN条目有最高的序列号并标记为LAST_LONG_ENTRY
# 序列号在第一个字节,例如 0x41 (LAST_LONG_ENTRY | 1), 0x02, 0x03...
# 我们需要从后向前收集它们,或者收集后反转
# 此处简化处理:假设LFN条目紧邻其短文件名条目出现
# 并且我们在这里简单地累加LFN片段,实际解析LFN更复杂
# 提取LFN字符 (Unicode, 2字节/字符)
# 偏移量: 1-10, 14-25, 28-31
lfn_chars_bytes = entry_data[1:11] + entry_data[14:26] + entry_data[28:32] # 拼接LFN字符字节
try:
# 移除填充的0xFFFF,并解码
lfn_part = lfn_chars_bytes.decode('utf-16le', errors='replace').rstrip('x00').rstrip('uffff') # 解码LFN片段
lfn_parts.append(lfn_part) # 添加到LFN片段列表
# print(f"Debug: LFN part raw: {entry_data[0]:02X}, chars: {lfn_part}") # 调试信息
except UnicodeDecodeError:
# print(f"Debug: LFN decode error for part.") # 调试信息
pass # 忽略解码错误
continue # LFN条目处理完毕,继续下一个条目,等待其对应的短文件名条目
# 如果不是LFN条目,或者是一个已删除的LFN条目后的短文件名条目
# 这时应该是短文件名条目 (SFN)
parsed_sfn_entry = parse_directory_entry(entry_data) # 解析短文件名条目
if parsed_sfn_entry and parsed_sfn_entry["type"] == "file_or_dir":
sfn_name = parsed_sfn_entry["filename"] # 获取短文件名
is_deleted_sfn = parsed_sfn_entry["is_deleted"] # 获取是否已删除
# 尝试组合LFN (如果存在)
if lfn_parts: # 如果LFN片段列表不为空
# LFN是倒序收集的,所以需要反转
lfn_parts.reverse() # 反转LFN片段列表
long_filename = "".join(lfn_parts) # 拼接成长文件名
lfn_parts = [] # 清空LFN片段列表以备下一个文件
actual_filename_to_use = long_filename # 使用长文件名
# print(f"Debug: Combined LFN: {long_filename} for SFN: {sfn_name}") # 调试信息
else:
actual_filename_to_use = sfn_name # 使用短文件名
# 清理文件名中的非法字符,特别是针对已删除文件名的第一个'?'
if is_deleted_sfn and actual_filename_to_use.startswith("?"): # 如果是已删除文件且以'?'开头
actual_filename_to_use = "_" + actual_filename_to_use[1:] # 替换'?'为'_'
# 替换路径中不安全的字符
safe_filename = "".join(c if c.isalnum() or c in (' ', '.', '_', '-') else '_' for c in actual_filename_to_use).strip() # 生成安全的文件名
if not safe_filename: # 如果安全文件名为空
safe_filename = f"unnamed_file_{
dir_entry_offset}" # 生成默认文件名
# 如果是 "." 或 ".." 目录项,跳过
if safe_filename == "." or safe_filename == "..": # 如果是特殊目录项
lfn_parts = [] # 清空LFN,因为这些条目通常没有LFN
continue # 继续下一个条目
# 构建恢复文件的完整输出路径
current_output_path_for_item = os.path.join(output_dir_base, current_path) # 获取当前项的输出路径
if not os.path.exists(current_output_path_for_item): # 如果路径不存在
try:
os.makedirs(current_output_path_for_item, exist_ok=True) # 创建路径
except OSError as ose:
print(f"[!] 创建目录 {
current_output_path_for_item} 失败: {
ose}") # 打印错误
lfn_parts = [] # 清空LFN
continue # 继续
output_file_path = os.path.join(current_output_path_for_item, safe_filename) # 构建最终输出文件路径
if parsed_sfn_entry["is_directory"]: # 如果是目录
# (子)目录处理
child_dir_start_cluster = parsed_sfn_entry["first_cluster"] # 获取子目录起始簇号
if child_dir_start_cluster >= 2 and not is_deleted_sfn : # 如果簇号有效且目录未被标记为删除 (递归通常针对未删除的目录结构)
# 为递归调用创建新的相对路径
new_relative_path = os.path.join(current_path, safe_filename) # 构建新的相对路径
print(f"[*] 发现子目录: {
safe_filename} (簇: {
child_dir_start_cluster}), 准备递归进入...") # 打印信息
scan_directory_recursive(disk_handle, bpb, fat_table, child_dir_start_cluster, new_relative_path, output_dir_base) # 递归扫描
elif is_deleted_sfn: # 如果是已删除的目录
print(f"[*] 发现已删除的目录: {
safe_filename} (簇: {
child_dir_start_cluster}) - 暂不递归恢复已删除目录结构。") # 打印信息
elif is_deleted_sfn: # 如果是已删除的文件
# 文件恢复逻辑 (只恢复已删除的文件)
start_cluster = parsed_sfn_entry["first_cluster"] # 获取起始簇号
file_size = parsed_sfn_entry["file_size"] # 获取文件大小
if start_cluster >= 2 and file_size > 0 : # 如果簇号有效且文件大小大于0
# 为避免文件名冲突,可以加上唯一标识或检查
recovery_attempt_filename = f"recovered_{
safe_filename}" # 构建恢复文件名
final_output_path = os.path.join(current_output_path_for_item, recovery_attempt_filename) # 构建最终输出路径
# 确保文件名不会太长或包含重复的recovery前缀
count = 0
base_final_output_path = final_output_path
while os.path.exists(final_output_path) and count < 100: # 如果文件已存在,尝试添加后缀
final_output_path = f"{
base_final_output_path}_{
count}"
count += 1
print(f"[*] 发现已删除文件: '{
safe_filename}' 在路径 '{
current_path}', 起始簇: {
start_cluster}, 大小: {
file_size}") # 打印信息
recover_fragmented_file(disk_handle, bpb, fat_table, start_cluster, file_size, final_output_path) # 尝试恢复文件
lfn_parts = [] # 处理完一个SFN后,清空LFN buffer
elif parsed_sfn_entry and parsed_sfn_entry["type"] == "end_of_directory": # 如果是目录结束标记
print(f"[*] 到达目录 '{
current_path}' (簇 {
dir_start_cluster}) 的末尾。") # 打印信息
return # 结束当前目录扫描
else: # 其他无效条目或未处理的条目类型
lfn_parts = [] # 清空LFN buffer
# print(f"Debug: Skipping entry or unhandled type: {entry_data[0]:02X}")
dir_entry_offset += 1 # 目录条目偏移量加1
def main_recovery_orchestrator(disk_path, output_dir_base):
"""主恢复协调函数"""
if not os.path.exists(output_dir_base): # 如果输出目录不存在
os.makedirs(output_dir_base) # 创建输出目录
disk_handle = None # 初始化磁盘句柄
try:
disk_handle = open(disk_path, 'rb') # 以二进制读模式打开磁盘
print(f"[*] 成功打开磁盘: {
disk_path}") # 打印成功信息
boot_sector_data = read_sector(disk_handle, 0) # 读取引导扇区数据
if not boot_sector_data: # 如果读取失败
print("[!] 无法读取引导扇区。") # 打印错误信息
return # 结束函数
bpb = FAT32BPB(boot_sector_data) # 解析BPB
print(f"[*] BPB解析完成: 每扇区字节数={
bpb.bytes_per_sector}, 每簇扇区数={
bpb.sectors_per_cluster}, 数据区起始扇区={
bpb.first_data_sector}, 根目录簇={
bpb.root_dir_first_cluster}") # 打印BPB信息
fat_table = load_fat_table(disk_handle, bpb) # 加载FAT表
if not fat_table: # 如果加载失败
print("[!] 无法加载FAT表。") # 打印错误信息
return # 结束函数
# 从根目录开始递归扫描
scan_directory_recursive(disk_handle, bpb, fat_table, bpb.root_dir_first_cluster, "", output_dir_base) # "" 表示根路径
except FileNotFoundError:
print(f"[!] 磁盘路径 {
disk_path} 未找到。") # 打印文件未找到错误
except PermissionError:
print(f"[!] 权限不足,无法打开磁盘 {
disk_path}。请尝试以管理员权限运行。") # 打印权限错误
except Exception as e:
print(f"[!] 发生未知错误: {
e}") # 打印未知错误
import traceback # 导入traceback模块
traceback.print_exc() # 打印详细的堆栈跟踪信息
finally:
if disk_handle: # 如果磁盘句柄存在
disk_handle.close() # 关闭磁盘句柄
print(f"[*] 已关闭磁盘: {
disk_path}") # 打印关闭信息
# --- 使用示例 (调用主协调函数) ---
if __name__ == "__main__":
# !! 警告: 直接操作磁盘非常危险,请务必小心 !!
# disk_to_scan = r'\.X:' # 将 X 替换为你的U盘盘符 (Windows, 需要管理员权限)
# output_recovery_dir = "D:/fat32_recursive_recovered"
# print("警告:此脚本将尝试直接读取磁盘扇区。请确保目标磁盘路径正确,并理解潜在风险。") # 打印警告
# print("强烈建议先在磁盘镜像上测试。") # 打印建议
# proceed = input("是否继续进行递归扫描和恢复? (yes/no): ") # 提示用户确认
# if proceed.lower() == 'yes': # 如果用户确认
# main_recovery_orchestrator(disk_to_scan, output_recovery_dir) # 执行主恢复协调函数
# else:
# print("操作已取消。") # 打印取消信息
print("递归扫描和恢复的代码示例已提供。请在理解代码和风险后,替换disk_to_scan和output_recovery_dir路径,并在安全的环境下运行。") # 打印最终提示
递归扫描和LFN处理的改进说明:
scan_directory_recursive 函数:
接收 dir_start_cluster (当前要扫描目录的起始簇)、current_path (用于构建恢复文件的目录结构)、和 output_dir_base (总的输出根目录)。
LFN 处理逻辑 (简化版):
当遇到LFN条目 (attribute & 0x0F == 0x0F 且第一个字节不是删除标记 0xE5) 时,它会提取LFN字符片段并将其存储在 lfn_parts 列表中。
LFN条目是按其序列号倒序存储的,且在对应的短文件名(SFN)条目之前。当脚本遇到一个SFN条目时,它会检查 lfn_parts 是否有内容。如果有,它会将收集到的片段反转并连接起来形成完整的长文件名。
actual_filename_to_use 会优先使用LFN,否则使用SFN。
对已删除文件的文件名(通常以 0xE5 开头,被 parse_directory_entry 替换为 ?)做了简单处理,将 ? 替换为 _。
文件名中移除了不安全的字符,以确保可以作为有效的文件/目录名保存。
递归调用:
当遇到一个未删除的目录条目时 ( parsed_sfn_entry["is_directory"] 为 True 且 not is_deleted_sfn ),它会获取该子目录的起始簇号,并用新的相对路径递归调用 scan_directory_recursive。
这允许脚本遍历整个文件系统的目录树。
已删除文件的恢复: 当遇到一个已删除的SFN文件条目 (is_deleted_sfn 为 True 且不是目录),并且其起始簇号和文件大小有效时,调用 recover_fragmented_file 进行恢复。恢复的文件会放在其原始(相对)目录结构下。
目录结束: first_char_filename == 0x00 标记目录条目列表的结束。
main_recovery_orchestrator 函数:
作为顶层函数,初始化磁盘访问、BPB解析、FAT表加载。
调用 scan_directory_recursive,从根目录 (bpb.root_dir_first_cluster) 和空的基本路径 ("") 开始。
递归和LFN的复杂性与注意事项:
真正的LFN解析: 上述LFN处理是一个简化版本。一个完全符合规范的LFN解析器需要:
检查LFN条目的序列号 (第一个字节,高位是 LAST_LONG_ENTRY 标志,低位是序号)。
验证LFN条目的校验和 (第13个字节) 是否与对应的SFN的校验和匹配。
正确处理LFN条目横跨扇区边界的情况。
更鲁棒地处理 0x0000 和 0xFFFF 填充字符。
恢复已删除的目录结构: 代码目前主要递归遍历未删除的目录结构来找到已删除的文件。恢复已删除的目录本身(即,如果一个目录条目被标记为 0xE5)是一个更复杂的问题。因为目录的内容(子文件和子目录列表)也存储在簇中,如果这个目录的起始簇号仍然有效,理论上可以尝试恢复其内容,但这需要更复杂的逻辑来判断其原始大小和内容。
簇链的有效性: 递归依赖于目录的簇链是完整的。如果目录本身的簇链损坏,递归可能无法完整遍历。
5.4.2 更智能的簇读取与文件重组
“更智能”可以体现在多个方面:
错误容忍:
当 read_sector 失败时,目前是直接跳过或中止。更智能的做法可能是尝试多次读取,或者记录坏扇区并尝试跳过它来读取簇中剩余的好扇区。
当 get_cluster_chain 遇到坏簇 (0x0FFFFFF7) 或意外的空闲簇时,目前是中止簇链。可以考虑:
如果文件大小已知,记录已读取的数据,并报告文件可能不完整。
尝试启发式方法寻找可能的下一个簇(非常高级,通常用于数据雕刻)。
基于文件大小的簇读取优化:
在 recover_fragmented_file 中,我们根据 file_size 来限制读取的总字节数。这是一个好做法。
可以进一步优化:如果 file_size 小于一个簇的大小,并且这是文件的最后一个簇,则只读取 file_size % bpb.bytes_per_cluster 那么多字节,而不是整个簇(尽管读取整个簇然后截断也是可以的)。
处理FAT表不一致:
FAT32通常有两个FAT表作为备份。如果主FAT表(FAT1)中的某个条目看起来可疑(例如,形成了一个不合理的短循环,或者指向一个明显错误的簇区域),可以尝试从备份FAT表(FAT2)中读取相应的条目进行交叉验证。这需要知道FAT2的起始位置(通常紧跟在FAT1之后)。
bpb.num_fats 告诉我们有多少个FAT表。bpb.sectors_per_fat 告诉我们每个FAT表多大。FAT2的起始扇区 = bpb.reserved_sectors + bpb.sectors_per_fat。
文件校验和/签名验证 (针对已知文件类型):
如果我们在恢复特定类型的文件(如JPEG, PNG, DOCX),并且知道这些文件类型的内部结构或校验和机制,可以在恢复后尝试验证文件的完整性和正确性。这不是簇读取本身,而是恢复后的验证步骤。
示例:在 get_cluster_chain 中考虑坏簇标记
我们已经在 get_cluster_chain 中加入了对 0x0FFFFFF7 (坏簇) 的判断。当遇到坏簇时,簇链会中断。这意味着如果一个文件的一部分恰好落在一个被标记为坏簇的区域,那么该文件只能恢复到坏簇之前的部分。
# 在 get_cluster_chain 函数中,对坏簇的处理:
# ...
next_cluster_raw_value = fat_table[current_cluster] # 从FAT表中获取当前簇对应的条目值
if next_cluster_raw_value >= 0x0FFFFFF8: # 如果是文件结束标记
break # 跳出循环
elif next_cluster_raw_value == 0x0FFFFFF7: # 如果是坏簇标记
print(f"[!] 簇链在坏簇 {
current_cluster} (其FAT条目指向坏区) 处中断。文件可能不完整。") # 打印警告信息
# 坏簇意味着这个簇的数据不可读或不可靠,所以链条到此为止
break # 跳出循环
elif next_cluster_raw_value == 0x00000000: # 如果是空闲簇
print(f"[!] 簇链在空闲簇标记处 (簇 {
current_cluster} 指向 0) 中断。文件可能已损坏或被截断。") # 打印警告信息
break # 跳出循环
else:
current_cluster = next_cluster_raw_value # 更新当前簇号
if current_cluster < 2: # 如果下一个簇号无效
print(f"[!] 簇链中出现无效的下一个簇号: {
current_cluster} (来自簇 {
cluster_chain[-2] if len(cluster_chain)>1 else 'start'})") # 打印错误信息
break # 跳出循环
# ...
这个改动确保了当FAT表明确指出一个簇是坏簇时,我们不会尝试去读取它或将其作为链的一部分。恢复出来的文件将只包含坏簇之前的数据。
关于“更智能的簇读取”的思考:
真正的“智能”往往涉及到对文件内容本身的理解。例如:
对于文本文件:如果一个簇链恢复出来的文本突然中断,并且下一个可用的空闲簇(如果可以找到的话)包含看起来像是文本接续的内容,启发式算法可能会尝试将它们连接起来。
对于特定格式文件:例如JPEG文件,它们有特定的SOI(Start of Image)和EOI(End of Image)标记。如果一个簇链恢复的数据没有EOI,但磁盘上其他地方找到了一个似乎匹配的EOI片段,可能会尝试拼接。这就是数据雕刻的雏形。
这些高级的“智能”方法超出了基于文件系统元数据(如FAT表)恢复的范畴,更多地依赖于模式识别和启发式规则。

















暂无评论内容