基于知识图谱和Milvus的元数据入库流程

目录

1. 概要

2. 整体架构流程

2.1 技术架构特点

2.2 核心功能

3. 知识图谱提取

4. 知识图谱转excel

5. excel后处理

6. Milvus向量数据库录入

7. 数据库检索


1. 概要

这是一个文档向量化存储系统,主要用于将文本数据处理后存储到Milvus向量数据库中,为后续的检索增强生成(RAG)应用提供数据基础。

2. 整体架构流程

2.1 技术架构特点

1)模型集成
LLM模型:Qwen2.5-7B-Instruct用于知识抽取
嵌入模型:BCE-embedding-base_v1用于向量化
向量数据库:Milvus用于高效检索

2)数据流设计

Markdown文档 → 知识图谱 → 结构化数据 → 向量化 → Milvus存储

 graph TD
    A[电力技术文档.md] –> B[LightRAG知识图谱提取]
    B –> C[实体关系GraphML文件]
    C –> D[转换为Excel格式]
    D –> E[数据清洗和标准化]
    E –> F[BCE向量化]
    F –> G[Milvus数据库存储]
    G –> H[混合检索服务]
    
    I[用户查询] –> H
    H –> J[检索结果]

3)文本提取完整流程

graph TD
    A[书籍.pdf] –> B[PDF转MD]
    B –> C[书籍.md]
    C –> D[LightRAG知识图谱提取]
    D –> E[kv_store_text_chunks.json<br/>graph_chunk_entity_relation.graphml]
    E –> F[转换Excel格式]
    F –> G[substation_network_nodes.xlsx<br/>substation_network_edges.xlsx]
    G –> H[数据预处理过滤]
    H –> I[kv_store_text_chunks_helpful.xlsx]
    I –> J[生成摘要]
    J –> K[重要书籍确认.xlsx]
    K –> L[数据合并]
    L –> M[kv_store_text_chunks_final.xlsx]
    M –> N[BCE向量化]
    N –> O[Milvus数据库存储]

2.2 核心功能

1. 知识图谱构建:从Markdown技术文档中提取实体和关系
2. 知识图谱后处理:图谱优化、实体对齐、关系验证
3. 数据结构化:将知识转换为Excel格式便于分析
4. 向量化存储:将处理后的知识存储到Milvus向量数据库
5. 知识库检索:检索知识库内容

3. 知识图谱提取

(1)输入:pdf ocr 后的 md 文档
(2)输出:
        1)kv_store_text_chunks.json:以固定 token 长度截取出来的 chunks
        2)kv_store_doc_status.json: 存储所有 md 文档的信息
        3)kv_store_full_docs.json:存储所有 md 文档的内容
        4)graph_chunk_entity_relation.graphml: 用于可视化知识图谱的配置文件
        5)vdb_chunks.json:存储所有 chunkid
        6)vdb_relationships.json:提取到的关系,即图的 edge
        7)vdb_entities.json:提取到的实体,即图的 Node

# 使用LightRAG从Markdown文档构建知识图谱
building_kg(book_path, working_dir)

实现细节:

使用LightRAG框架,配合Qwen2.5-7B模型进行实体和关系抽取

定义了专门的实体类型:[“设备或部件名称”, “隐患或故障现象”, “检测工具”, “规章制度”]

支持异步处理,提高抽取效率

生成GraphML格式的知识图谱文件

完整代码:

### building_kg_using_lightrag.py

import asyncio
from lightrag import LightRAG, QueryParam
from lightrag.llm.openai import openai_complete_if_cache, openai_embed
from lightrag.utils import EmbeddingFunc
from lightrag.kg.shared_storage import initialize_share_data, finalize_share_data, initialize_pipeline_status
from tqdm import tqdm
import numpy as np
import glob
import os
from lightrag.prompt import PROMPTS
import time


PROMPTS["DEFAULT_ENTITY_TYPES"] = ["设备或部件名称", "隐患或故障现象", "检测工具", "规章制度"]
PROMPTS["entity_extraction"] = """-Goal-
给定一份可能与此活动相关的文本文档和一份实体类型列表,从文本中识别出这些类型的所有实体以及已识别实体之间的所有关系。

-Steps-
1. 识别所有实体。针对每个已识别的实体,提取以下信息:
- entity_name: 实体名称,大写
- entity_type: 以下类型之一: [{entity_types}]
- entity_description: 对实体属性和活动的全面描述
将每个实体格式化为 ("entity"{tuple_delimiter}<entity_name>{tuple_delimiter}<entity_type>{tuple_delimiter}<entity_description>

2. 从步骤 1 中确定的实体中,找出彼此*明确相关的所有(源实体、目标实体)对。
对于每一对相关实体,提取以下信息:
- 源实体:源实体的名称,如步骤 1 所确定的
- target_entity:在步骤 1 中确定的目标实体名称
- relation_description:解释您认为源实体和目标实体相互关联的原因
- relation_strength:表示源实体和目标实体之间关系强度的数字分数
- relation_keywords:一个或多个高级关键词,概括关系的总体性质,侧重于概念或主题,而 不是具体细节。
将每个关系格式化为("relationship"{tuple_delimiter}<source_entity>{tuple_delimiter}<target_entity>{tuple_delimiter}<relationship_description>{tuple_delimiter}<relationship_keywords>{tuple_delimiter}<relationship_strength>)

3. 找出概括全文主要概念、主题或话题的高级关键词。这些关键词应能捕捉到文件中的总体思想。
将内容级关键词格式化为 ("content_keywords"{tuple_delimiter}<high_level_keywords>)。

4. 返回中文输出,即步骤 1 和 2 中确定的所有实体和关系的单个列表。使用 **{record_delimiter}** 作为列表分隔符。

5. 完成后,输出{completion_delimiter}。

######################
-Examples-
######################
Example 1:

Entity_types: [person, technology, mission, organization, location]
Text:
while Alex clenched his jaw, the buzz of frustration dull against the backdrop of Taylor's authoritarian certainty. It was this competitive undercurrent that kept him alert, the sense that his and Jordan's shared commitment to discovery was an unspoken rebellion against Cruz's narrowing vision of control and order.

Then Taylor did something unexpected. They paused beside Jordan and, for a moment, observed the device with something akin to reverence. "If this tech can be understood..." Taylor said, their voice quieter, "It could change the game for us. For all of us."

The underlying dismissal earlier seemed to falter, replaced by a glimpse of reluctant respect for the gravity of what lay in their hands. Jordan looked up, and for a fleeting heartbeat, their eyes locked with Taylor's, a wordless clash of wills softening into an uneasy truce.

It was a small transformation, barely perceptible, but one that Alex noted with an inward nod. They had all been brought here by different paths
################
Output:
("entity"{tuple_delimiter}"Alex"{tuple_delimiter}"person"{tuple_delimiter}"Alex是一个经历过挫折、善于观察其他角色动态的人物。"){record_delimiter}
("entity"{tuple_delimiter}"Taylor"{tuple_delimiter}"person"{tuple_delimiter}"Taylor被描绘成具有权威性的确定性,并对一个设备表现出敬仰,这表明了观点的转变。"){record_delimiter}
("entity"{tuple_delimiter}"Jordan"{tuple_delimiter}"person"{tuple_delimiter}"Jordan对发现持有承诺,并与Taylor关于一个设备有重要的互动。"){record_delimiter}
("entity"{tuple_delimiter}"Cruz"{tuple_delimiter}"person"{tuple_delimiter}"Cruz与控制与秩序的愿景相关联,影响其他角色之间的动态。"){record_delimiter}
("entity"{tuple_delimiter}"The Device"{tuple_delimiter}"technology"{tuple_delimiter}"The Device是故事的核心,具有可能改变游戏规则的影响,并且被Taylor所敬仰。"){record_delimiter}
("relationship"{tuple_delimiter}"Alex"{tuple_delimiter}"Taylor"{tuple_delimiter}"Alex受到Taylor权威确定性的影响,并观察到Taylor对设备的看法发生了变化。"{tuple_delimiter}"power dynamics, perspective shift"{tuple_delimiter}7){record_delimiter}
("relationship"{tuple_delimiter}"Alex"{tuple_delimiter}"Jordan"{tuple_delimiter}"Alex和Jordan对发现持有共同的承诺,这与Cruz的愿景形成对比。"{tuple_delimiter}"shared goals, rebellion"{tuple_delimiter}6){record_delimiter}
("relationship"{tuple_delimiter}"Taylor"{tuple_delimiter}"Jordan"{tuple_delimiter}"Taylor和Jordan直接关于设备进行互动,导致相互尊重和不安的休战。"{tuple_delimiter}"conflict resolution, mutual respect"{tuple_delimiter}8){record_delimiter}
("relationship"{tuple_delimiter}"Jordan"{tuple_delimiter}"Cruz"{tuple_delimiter}"Jordan对发现的承诺是对Cruz控制与秩序愿景的反叛。"{tuple_delimiter}"ideological conflict, rebellion"{tuple_delimiter}5){record_delimiter}
("relationship"{tuple_delimiter}"Taylor"{tuple_delimiter}"The Device"{tuple_delimiter}"Taylor对设备表现出敬仰,表明了它的重要性和潜在影响。"{tuple_delimiter}"reverence, technological significance"{tuple_delimiter}9){record_delimiter}
("content_keywords"{tuple_delimiter}"power dynamics, ideological conflict, discovery, rebellion"){completion_delimiter}
#############################
Example 2:

Entity_types: [person, technology, mission, organization, location]
Text:
They were no longer mere operatives; they had become guardians of a threshold, keepers of a message from a realm beyond stars and stripes. This elevation in their mission could not be shackled by regulations and established protocols—it demanded a new perspective, a new resolve.

Tension threaded through the dialogue of beeps and static as communications with Washington buzzed in the background. The team stood, a portentous air enveloping them. It was clear that the decisions they made in the ensuing hours could redefine humanity's place in the cosmos or condemn them to ignorance and potential peril.

Their connection to the stars solidified, the group moved to address the crystallizing warning, shifting from passive recipients to active participants. Mercer's latter instincts gained precedence— the team's mandate had evolved, no longer solely to observe and report but to interact and prepare. A metamorphosis had begun, and Operation: Dulce hummed with the newfound frequency of their daring, a tone set not by the earthly
#############
Output:
("entity"{tuple_delimiter}"Washington"{tuple_delimiter}"location"{tuple_delimiter}"华盛顿是一个接收通信的位置,这表明它在决策过程中的重要性。"){record_delimiter}
("entity"{tuple_delimiter}"Operation: Dulce"{tuple_delimiter}"mission"{tuple_delimiter}"行动:杜尔塞被描述为一个任务,该任务已发展为进行互动和准备,表明目标和活动发生了重大转变。"){record_delimiter}
("entity"{tuple_delimiter}"The team"{tuple_delimiter}"organization"{tuple_delimiter}"团队被描绘为一群个体,他们已从被动观察者转变为任务中的积极参与者,显示了他们角色的动态变化。"){record_delimiter}
("relationship"{tuple_delimiter}"The team"{tuple_delimiter}"Washington"{tuple_delimiter}"团队从华盛顿接收通信,这影响了他们的决策过程。"{tuple_delimiter}"decision-making, external influence"{tuple_delimiter}7){record_delimiter}
("relationship"{tuple_delimiter}"The team"{tuple_delimiter}"Operation: Dulce"{tuple_delimiter}"团队直接参与行动:杜尔塞,执行其发展目标的活动。"{tuple_delimiter}"mission evolution, active participation"{tuple_delimiter}9){completion_delimiter}
("content_keywords"{tuple_delimiter}"mission evolution, decision-making, active participation, cosmic significance"){completion_delimiter}
#############################
Example 3:

Entity_types: [person, role, technology, organization, event, location, concept]
Text:
4.3 外绝缘破损
4.3.1 现象
外绝缘表面有破损、开裂、缺胶、杂质、凸起等。
4.3.2 处理原则
a)判断外绝缘表面缺陷的面积和深度。
b)查看避雷器外绝缘的放电情况,有无火花、放电痕迹。
c)巡视时应注意与避雷器设备保持足够的安全距离,应远离避雷器进行观察。
d)发现避雷器外绝缘破损、开裂等,需要更换外绝缘时,应汇报值班调控人员申请停运处理。
4.4 本体炸裂、引线脱落接地
4.4.1 现象
中性点有效接地系统
a)监控系统发出相关保护动作、断路器跳闸变位信息,相关电压、电流、功率显示为零;
b)相关保护装置发出动作信息;
c)避雷器本体损坏、引线脱落。
中性点非有效接地系统
a)监控系统发出母线接地告警信息;
b)相应母线电压表指示:接地相电压降低,其它两相电压升高;
c)避雷器本体损坏、引线脱落。
4.4.2 处理原则
a)检查记录监控系统告警信息,现场记录有关保护及自动装置动作情况。
b)现场查看避雷器损坏、引线脱落情况和临近设备外绝缘的损伤状况,核对一次设备动作情况。
c)查找故障点,判明故障原因后,立即将现场情况汇报值班调控人员,按照值班调控人员指令隔离故障,联系检修人员处理。
d)查找中性点非有效接地系统接地故障时,应遵守《国家电网公司电力安全工作规程(变电部分)》规定,与故障设备保持足够的安全距离,防止跨步电压伤人。
4.5 绝缘闪络
4.5.1 现象
中性点有效接地系统
a)监控系统发出相关保护动作、断路器跳闸变位信息,相关电压、电流、功率显示为零;
b)相关保护装置发出动作信息;
c)避雷器外绝缘有放电痕迹,接地引下线或有放电痕迹。
中性点非有效接地系统
#############
Output:
("entity"{tuple_delimiter}"避雷器"{tuple_delimiter}"设备名称"{tuple_delimiter}"避雷器用于保护电力系统设备免受雷击和过电压影响。"){record_delimiter}  
("entity"{tuple_delimiter}"变压器"{tuple_delimiter}"设备名称"{tuple_delimiter}"变压器用于改变电压等级,是电力系统中的关键设备。"){record_delimiter}  
("entity"{tuple_delimiter}"外绝缘"{tuple_delimiter}"设备部件名称"{tuple_delimiter}"外绝缘是电力设备的保护层,用于提供绝缘和防护功能,防止环境影响。"){record_delimiter}  
("entity"{tuple_delimiter}"避雷器外绝缘"{tuple_delimiter}"设备部件名称"{tuple_delimiter}"避雷器外绝缘是避雷器的重要组成部分,提供绝缘并保护内部部件免受外部环境的影响。"){record_delimiter}  
("entity"{tuple_delimiter}"避雷器引线"{tuple_delimiter}"设备部件名称"{tuple_delimiter}"避雷器引线是连接避雷器与电网的重要部件,用于导通雷击电流至接地系统。"){record_delimiter}  
("entity"{tuple_delimiter}"变压器绕组"{tuple_delimiter}"设备部件名称"{tuple_delimiter}"变压器绕组是变压器的核心组件,负责电压变换和能量传递。"){record_delimiter}  
("entity"{tuple_delimiter}"外绝缘破损"{tuple_delimiter}"故障现象"{tuple_delimiter}"外绝缘破损是指绝缘层受损,如开裂或老化,可能导致局部放电或绝缘性能下降。"){record_delimiter}  
("entity"{tuple_delimiter}"绝缘闪络"{tuple_delimiter}"故障现象"{tuple_delimiter}"绝缘闪络是由于绝缘性能下降而产生的放电现象,可能导致设备短路或失效。"){record_delimiter}  
("entity"{tuple_delimiter}"变压器过热"{tuple_delimiter}"故障现象"{tuple_delimiter}"变压器过热是因负载过大或冷却不足导致的温度升高,可能损坏绕组绝缘。"){record_delimiter}  
("entity"{tuple_delimiter}"监控系统"{tuple_delimiter}"检测工具"{tuple_delimiter}"监控系统用于实时监测电力设备状态,包括温度、电流、电压等,及时识别故障。"){record_delimiter}  
("relationship"{tuple_delimiter}"避雷器外绝缘"{tuple_delimiter}"外绝缘破损"{tuple_delimiter}"避雷器外绝缘破损可能引发局部放电,降低避雷器的绝缘能力并增加设备故障风险。"{tuple_delimiter}"insulation failure, discharge"{tuple_delimiter}8){record_delimiter}  
("relationship"{tuple_delimiter}"外绝缘破损"{tuple_delimiter}"绝缘闪络"{tuple_delimiter}"外绝缘破损可能导致绝缘闪络,进而引发设备放电和功能失效。"{tuple_delimiter}"flashover risk, failure propagation"{tuple_delimiter}9){record_delimiter}  
("relationship"{tuple_delimiter}"避雷器"{tuple_delimiter}"变压器"{tuple_delimiter}"避雷器失效可能导致雷击能量直接作用于变压器,引起绕组过热或绝缘损坏。"{tuple_delimiter}"overvoltage, thermal damage"{tuple_delimiter}7){record_delimiter}  
("relationship"{tuple_delimiter}"绝缘闪络"{tuple_delimiter}"变压器过热"{tuple_delimiter}"绝缘闪络可能引起变压器绕组过载,从而导致过热现象。"{tuple_delimiter}"overload, thermal rise"{tuple_delimiter}8){record_delimiter}  
("relationship"{tuple_delimiter}"变压器过热"{tuple_delimiter}"监控系统"{tuple_delimiter}"变压器过热会触发监控系统报警,帮助维护人员及时发现并解决问题。"{tuple_delimiter}"temperature monitoring, alert"{tuple_delimiter}9){record_delimiter}  
("relationship"{tuple_delimiter}"避雷器引线"{tuple_delimiter}"监控系统"{tuple_delimiter}"避雷器引线脱落会被监控系统识别为连接异常,并触发相应的故障报警。"{tuple_delimiter}"fault detection, response"{tuple_delimiter}8){record_delimiter}  
("relationship"{tuple_delimiter}"外绝缘破损"{tuple_delimiter}"避雷器引线"{tuple_delimiter}"外绝缘破损引起放电可能导致避雷器引线过热或烧损,影响其导电性能。"{tuple_delimiter}"heat damage, connection failure"{tuple_delimiter}7){record_delimiter}  
("content_keywords"{tuple_delimiter}"避雷器, 变压器, 外绝缘破损, 绝缘闪络, 变压器过热, 故障传递, 监控系统"){completion_delimiter}
#############################
-Real Data-
######################
Entity_types: {entity_types}
Text: {input_text}
######################
Output:
"""
PROMPTS["keywords_extraction"] = """---Role---

You are a helpful assistant tasked with identifying both high-level and low-level keywords in the user's query.

---Goal---

Given the query, list both high-level and low-level keywords. High-level keywords focus on overarching concepts or themes, while low-level keywords focus on specific entities, details, or concrete terms.

---Instructions---

- Output the keywords in JSON format.
- The JSON should have two keys:
  - "high_level_keywords" for overarching concepts or themes.
  - "low_level_keywords" for specific entities or details.

######################
-Examples-
######################
Example 1:

Query: "国际贸易如何影响全球经济稳定?"
################
Output:
{
           {
  "high_level_keywords": ["国际贸易", "全球经济稳定", "经济影响"],
  "low_level_keywords": ["贸易协定", "关税", "货币兑换", "进口", "出口"]
}}
#############################
Example 2:

Query: "砍伐森林对生物多样性有哪些环境影响?"
################
Output:
{
           {
  "high_level_keywords": ["环境影响", "砍伐森林", "生物多样性丧失"],
  "low_level_keywords": ["物种灭绝", "栖息地破坏", "碳排放", "雨林", "生态系统"]
}}
#############################
Example 3:

Query: "教育在减贫中发挥什么作用?"
################
Output:
{
           {
  "high_level_keywords": ["教育", "减贫", "社会经济发展"],
  "low_level_keywords": ["入学机会", "识字率", "职业培训", "收入不平等"]
}}
#############################
-Real Data-
######################
Query: {query}
######################
Output:

"""

"""
CUDA_VISIBLE_DEVICES=3 nohup vllm serve /data1/workspace-lxr/pretrained_model_weights/llm_weights/qwen/Qwen2___5-7B-Instruct-GPTQ-Int4 --gpu-memory-utilization 0.60 --max_num_seqs 512 --max-model-len 30000 --host 0.0.0.0 --port 7767 > Qwen2___5-7B-Instruct-GPTQ-Int4.log &
"""

# 定义可用端口池
# AVAILABLE_PORTS = [7778, 7779, 7780, 7781, 7782, 7783]
AVAILABLE_PORTS = [7789]  
_current_port_index = 0
_port_lock = asyncio.Lock()

async def get_next_available_port():
    global _current_port_index
    async with _port_lock:
        current_port = AVAILABLE_PORTS[_current_port_index]
        _current_port_index = (_current_port_index + 1) % len(AVAILABLE_PORTS)
        return current_port

async def llm_model_func(
        prompt, system_prompt=None, history_messages=[], **kwargs
) -> str:
    max_retries = len(AVAILABLE_PORTS)
    retries = 0
    last_error = None

    while retries < max_retries:
        try:
            port = await get_next_available_port()
            return await openai_complete_if_cache(
                model="/data/workspace-fkh/.cache/modelscope/hub/models/Qwen/Qwen2.5-7B-Instruct-GPTQ-Int4/",
                prompt=prompt,
                system_prompt=system_prompt,
                history_messages=history_messages,
                api_key="gg",
                base_url=f"http://127.0.0.1:{port}/v1",
                **kwargs,
            )
        except Exception as e:
            last_error = e
            retries += 1
            await asyncio.sleep(0.1)  
            continue

    raise Exception(f"所有端口尝试均失败。最后的错误: {str(last_error)}")


"""
curl -X POST 
  -H "Content-Type: application/json" 
  --data '{"input": "Hello, world!", "model": "gpt-4"}' 
  http://127.0.0.1:9996/v1/embeddings
"""


async def embedding_func(texts: list[str]) -> np.ndarray:
    return await openai_embed(
        texts,
        model="gpt-4",
        api_key="EMPTY",
        base_url="http://127.0.0.1:9996/v1",
    )


async def initialize_rag(working_dir): 
    rag = LightRAG(llm_model_max_token_size=10000,
                    chunk_token_size=600,
                    chunk_overlap_token_size=100,
                    working_dir=working_dir,
                    llm_model_func=llm_model_func,
                    embedding_func=EmbeddingFunc(embedding_dim=1024, max_token_size=512, func=embedding_func),
                    llm_model_max_async=50,
                    embedding_func_max_async=50)

    await rag.initialize_storages()
    await initialize_pipeline_status()
    return rag


def building_kg(book_path, working_dir):
  if not os.path.exists(working_dir):
    os.mkdir(working_dir)

  rag = asyncio.run(initialize_rag(working_dir))
  path_list = list(set(glob.glob(book_path)))
  extracted_book = []
  for path in path_list:
      with open(path) as f:
          extracted_book.append(f.read())

  print("Start KG extracting")
  start = time.time()
  rag.insert(extracted_book[:1])
  end = time.time()
  print(f'Total time: {end - start}')


if __name__ == "__main__":
    # book_path = '/home/books/*.md'
    book_path = '/home/markdown_new_books/*.md'
    working_dir = '/data/test_books/'

    building_kg(book_path, working_dir)

    """
    hybrid,global,local,naive search
    top_k: int = 60
    max_token_for_text_unit: int = 4000
    max_token_for_global_context: int = 4000
    max_token_for_local_context: int = 4000
    """
"""
CUDA_VISIBLE_DEVICES=4 nohup vllm serve /data3/workspace-lxr/pretrained_model_weights/deepseek-ai/DeepSeek-R1-Distill-Qwen-32B --gpu-memory-utilization 0.98 --tensor-parallel-size 1 --port 7770 --max-model-len 10000 --enforce-eager > ./DeepSeek-R1-Distill-Qwen-32B.log &
"""

4. 知识图谱转excel

(1)输入:知识图谱提取得到的 graph_chunk_entity_relation.graphml
(2)输出:
        1)substation_network_edges.xlsx: 存储所有关系 excel 表格
        2)substation_network_nodes.xlsx:存储所有实体的 excel 表格

# 将NetworkX图谱转换为Excel格式
networkx_to_excel(G, node_file, edge_file)

实现功能:

将GraphML图谱转换为节点表和边表

生成包含实体类型、关系描述、权重等信息的结构化数据

便于后续的数据分析和处理

完整代码:

### lightrag_processing.py


import networkx as nx
import pandas as pd
from llm_classifier_useing_vllm import llm_classifier
from pyvis.network import Network
import pandas as pd
import networkx as nx
from itertools import combinations
from concurrent.futures import ProcessPoolExecutor
from tqdm import tqdm
import os,requests,json
import warnings

warnings.filterwarnings('ignore')

# 优化后的路径查找模块
class PathFinder:
    def __init__(self, G, max_depth=3):
        self.G = G
        self.max_depth = max_depth
        self.phenomenon_nodes = [n for n, d in G.nodes(data=True) if d['type'] == '故障现象']
        self._build_node_cache()

    def _build_node_cache(self):
        """预缓存节点类型信息"""
        self.node_type_cache = nx.get_node_attributes(self.G, 'type')
        self.valid_middle_nodes = {
            n for n, t in self.node_type_cache.items()
            if t == '设备或部件名称'
        }

    def _is_valid_edge(self, u, v):
        """快速验证边有效性"""
        # 故障现象之间允许直连
        if (self.node_type_cache[u] == '故障现象' and
                self.node_type_cache[v] == '故障现象'):
            return True
        # 故障现象必须通过设备连接
        if (self.node_type_cache[u] == '故障现象' and
                self.node_type_cache[v] in ['设备或部件名称']):
            return True
        if (self.node_type_cache[v] == '故障现象' and
                self.node_type_cache[u] in ['设备或部件名称']):
            return True
        return False

    def find_paths_between(self, pair):
        """并行处理节点对路径查找"""
        s, t = pair
        paths = []
        visited = set()

        # 自定义DFS实现,带剪枝
        def dfs(current, path, depth):
            if depth > self.max_depth:
                return
            if current == t and len(path) >= 2:
                paths.append(path + [current])
                return

            visited.add(current)
            for neighbor in self.G.neighbors(current):
                if neighbor in visited:
                    continue
                if not self._is_valid_edge(current, neighbor):
                    continue  # 提前剪枝无效边
                if len(path) > 0:
                    # 验证中间节点类型
                    prev_type = self.node_type_cache[path[-1]]
                    curr_type = self.node_type_cache[neighbor]
                    if (prev_type == '故障现象' and
                            curr_type not in ['设备或部件名称', '故障现象']):
                        continue
                dfs(neighbor, path + [current], depth + 1)
            visited.remove(current)

        dfs(s, [], 0)
        return [p for p in paths if self._final_validation(p)]

    def _final_validation(self, path):
        """最终路径验证"""
        if len(path) < 2:
            return False
        if self.node_type_cache[path[0]] != '故障现象':
            return False
        if self.node_type_cache[path[-1]] != '故障现象':
            return False

        # 检查中间节点类型
        for node in path[1:-1]:
            if self.node_type_cache[node] != '设备或部件名称':
                return False
        return True

# 并行路径查找函数
def parallel_path_finding(G, max_workers=None):
    finder = PathFinder(G)
    pairs = list(combinations(finder.phenomenon_nodes, 2))

    # 使用进程池并行计算
    with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
        results = list(tqdm(
            executor.map(finder.find_paths_between, pairs),
            total=len(pairs),
            desc="并行路径查找"
        ))

    # 合并结果并去重
    all_paths = []
    seen = set()
    for path_list in results:
        for path in path_list:
            # 标准化路径表示
            if path[0] > path[-1]:  # 统一排序方向
                path = path[::-1]
            key = tuple(path)
            if key not in seen:
                seen.add(key)
                all_paths.append(path)
    return all_paths


def networkx_to_excel(G, node_file, edge_file):
    # 生成节点表
    nodes = [{"Node ID": n, **attr} for n, attr in G.nodes(data=True)]
    df_nodes = pd.DataFrame(nodes).fillna('')

    # 生成边表(添加实体类型)
    edges = []
    for u, v, attr in G.edges(data=True):
        edge_row = {
            "Source": u,
            "Target": v,
            "Source Type": G.nodes[u].get('entity_type', ''),
            "Target Type": G.nodes[v].get('entity_type', '')
        }
        edge_row.update(attr)
        edges.append(edge_row)
    df_edges = pd.DataFrame(edges).fillna('')

    # 使用xlsxwriter引擎同时保存数据和调整格式
    with pd.ExcelWriter(node_file, engine='xlsxwriter') as writer:
        df_nodes.to_excel(writer, index=False)
        worksheet = writer.sheets['Sheet1']
        for idx, col in enumerate(df_nodes):
            max_len = max(df_nodes[col].astype(str).map(len).max(), len(col)) + 1
            worksheet.set_column(idx, idx, max_len)

    with pd.ExcelWriter(edge_file, engine='xlsxwriter') as writer:
        df_edges.to_excel(writer, index=False)
        worksheet = writer.sheets['Sheet1']
        for idx, col in enumerate(df_edges):
            max_len = max(df_edges[col].astype(str).map(len).max(), len(col)) + 1
            worksheet.set_column(idx, idx, max_len)


def revers_hop2(excel_path):
    from tqdm import tqdm

    import pandas as pd
    import networkx as nx
    from openpyxl import Workbook
    from openpyxl.styles import PatternFill, Font
    from openpyxl.utils import get_column_letter

    df = pd.read_excel(excel_path)

    # 创建无向图
    G = nx.Graph()

    # 添加节点属性
    node_types = {}
    print(f"add_edge start")
    for _, row in tqdm(df.iterrows(), total=len(df)):
        G.add_edge(row['Source'], row['Target'], weight=row['weight'], description=row['description'])
        node_types[row['Source']] = row['Source Type']
        node_types[row['Target']] = row['Target Type']

    print(f"set_node_attributes start")
    nx.set_node_attributes(G, node_types, 'type')

    # 改进的路径验证函数
    def is_valid_path(path):
        """验证路径格式:允许故障现象直接连接或通过设备连接"""
        if len(path) < 2:
            return False

        # 验证首尾节点类型
        if G.nodes[path[0]]['type'] != '故障现象' or G.nodes[path[-1]]['type'] != '故障现象':
            return False

        # 处理1跳直接连接
        if len(path) == 2:
            return True

        # 处理多跳连接
        for node in path[1:-1]:
            if G.nodes[node]['type'] != '设备或部件名称':
                return False
        return True

    print("开始并行路径搜索...")
    all_paths = parallel_path_finding(G)

    # 路径去重处理
    seen = set()
    unique_paths = []
    print(f"all_paths start")
    for path in tqdm(all_paths):
        # 使用冻结集合处理无向图双向路径
        fs = frozenset([path[0], path[-1]])
        if (fs, len(path)) not in seen:
            seen.add((fs, len(path)))
            unique_paths.append(path)

    # 处理路径生成报表数据
    report_data = []
    print(f"unique_paths start")
    for path in tqdm(unique_paths):
        # 提取关键设备(排除故障现象节点)
        devices = list({node for node in path[1:-1] if G.nodes[node]['type'] == '设备或部件名称'})

        # 合并描述信息
        descriptions = []
        total_weight = 0
        for u, v in zip(path[:-1], path[1:]):
            edge_data = G.get_edge_data(u, v)
            if edge_data:  # 处理无向图边查询
                descriptions.append(edge_data['description'])
                total_weight += edge_data['weight']

        # 确定关系类型
        relation_type_map = {
            1: "直接连接",
            2: "设备关联(1跳)",
            3: "设备关联(2跳)",
            4: "设备关联(3跳)"
        }
        relation_type = relation_type_map.get(len(path), "复杂关联")

        report_data.append({
            '主故障现象': path[0],
            '关联故障现象': path[-1],
            '关联类型': relation_type,
            '路径详情': '→'.join(path),
            '关键设备/部件': '、'.join(devices) if devices else "无",
            '综合描述': '◆ '.join(descriptions),
            '权重评分': total_weight
        })
    print(f"创建DataFrame并去重 start")
    # 创建DataFrame并去重
    result_df = pd.DataFrame(report_data)
    # 按关键列去重
    result_df = result_df.drop_duplicates(
        subset=['主故障现象', '关联故障现象', '关联类型', '关键设备/部件'],
        keep='first'
    ).sort_values(by='权重评分', ascending=False)

    # 改进的Excel生成函数
    def create_styled_excel(df, filename):
        wb = Workbook()
        ws = wb.active
        ws.title = "故障现象关联分析"

        # 优化后的列配置
        columns = [
            ('主故障现象', 20),
            ('关联故障现象', 20),
            ('关联类型', 15),
            ('路径详情', 35),
            ('关键设备/部件', 25),
            ('综合描述', 50),
            ('权重评分', 15)
        ]

        # 写入表头
        ws.append([col[0] for col in columns])

        # 写入数据
        for _, row in df.iterrows():
            ws.append([row[col[0]] for col in columns])

        # 设置样式
        header_fill = PatternFill(start_color='4F81BD', end_color='4F81BD', fill_type='solid')
        header_font = Font(bold=True, color='FFFFFF')

        # 设置列宽和格式
        for idx, (_, width) in enumerate(columns, 1):
            ws.column_dimensions[get_column_letter(idx)].width = width
            # 设置自动换行
            for cell in ws[get_column_letter(idx)]:
                cell.alignment = cell.alignment.copy(wrap_text=True)

        # 应用表头样式
        for cell in ws[1]:
            cell.fill = header_fill
            cell.font = header_font

        # 添加条件格式(权重评分)
        red_fill = PatternFill(start_color='FFC7CE', end_color='FFC7CE', fill_type='solid')
        for row in ws.iter_rows(min_row=2, max_col=7, max_row=ws.max_row):
            if row[6].value and row[6].value >= 10:  # 权重评分列
                row[6].fill = red_fill

        # 添加筛选和冻结
        ws.auto_filter.ref = ws.dimensions
        ws.freeze_panes = 'A2'

        wb.save(filename)

    # 生成最终Excel文件
    print(f"文本长度:{len(result_df)}")
    print(f"create_styled_excel start")
    create_styled_excel(result_df, excel_path.replace(".xlsx", "_hop2.xlsx"))


def device_level_determination(file_path):
    df = pd.read_excel(file_path)
    df = df[df["Source Type"] == "设备或部件名称"][df["Target Type"] == "设备或部件名称"]
    prompt_list = []
    origin_data = []
    for des, s, t, w in zip(df["description"], df["Source"], df["Target"], df["weight"]):
        prompt = "<|im_start|>system
请判断设备的层级(所属)关系,注意:答案只需回复A/B/C即可。<|im_end|>
" 
                 "<|im_start|>user
关系描述:{des}
A.{s}是{t}的父级,即{s}包含{t}; 
B.{t}是{s}的父级,即{t}包含{s}; 
C.无法确定{t}与{s}的关系或{t}与{s}属于平级关系
 请判断(只需回复A或B):<|im_end|><|im_start|>assistant
"
        prompt_list.append(prompt.format(des=des, s=s, t=t))
        origin_data.append([des, s, t, w])

    batch_size = 1024
    predicted_labels = []
    for i in tqdm(range(0, len(prompt_list), batch_size), total=round(len(prompt_list) / batch_size, 0)):
        batch_input_sentences = prompt_list[i:i + batch_size]
        predicted_label_list, batch_predicted_prob_list = llm_classifier(batch_input_sentences, ["A", "B"])
        predicted_labels.extend(predicted_label_list)

    D = nx.DiGraph()
    for label, od in zip(predicted_labels, origin_data):
        if label == 0:
            D.add_edge(od[1], od[2], relation=od[0], weight=od[3])
        elif label == 0:
            D.add_edge(od[2], od[1], relation=od[0], weight=od[3])
        else:
            D.add_node(od[1])
            D.add_node(od[2])


    def export_graph_to_excel(D, filename):
        rows = []


        # 处理单跳路径(直接边)
        for u, v, data in D.edges(data=True):
            check = 0
            if check == 0:
                rows.append({
                    'source': u,
                    'hop1': v,
                    # 'hop2': '',
                    # 'weight': data['weight'],
                    'description': data['relation']
                })

        # 创建DataFrame并保存
        df = pd.DataFrame(rows)

        df = df[['source', 'hop1','description']]

        # 去重(避免因多次遍历中间节点产生重复)
        df = df.drop_duplicates()

        # 保存到Excel
        df.to_excel(filename, index=False)
        print(f"Excel文件已保存至 {filename}")

    # 调用函数导出Excel
    export_graph_to_excel(D, "/data/device_level_determination_graph.xlsx")


def entity_alignment(file_path: str):
    from tqdm import tqdm
    df = pd.read_excel(file_path)
    batch_size = 1024

    s_input_sentences = []
    t_input_sentences = []
    for s, st, t, tt in zip(df["Source"], df["Source Type"], df["Target"], df["Target Type"]):
        prompt = "<|im_start|>system
请判断实体名词的属性,注意:答案只需回复A或B即可。<|im_end|>
" 
                 "<|im_start|>user
问题:{entity}是{entity_type}吗?
A.是; B.不是; 请判断(只需回复A或B):<|im_end|><|im_start|>assistant
"

        s_input_sentences.append(prompt.format(entity=s, entity_type=st))
        t_input_sentences.append(prompt.format(entity=t, entity_type=tt))

    for ti, input_sentences in enumerate([s_input_sentences, t_input_sentences]):
        predicted_probabilities = []
        for i in tqdm(range(0, len(input_sentences), batch_size), total=round(len(input_sentences) / batch_size, 0)):
            batch_input_sentences = input_sentences[i:i + batch_size]
            _, batch_predicted_prob_list = llm_classifier(batch_input_sentences, ["A"])
            predicted_probabilities.extend(batch_predicted_prob_list)
        entity_type = "SourceType" if ti == 0 else "TargetType"
        df[f"{entity_type}_probabilities"] = predicted_probabilities
        df.to_excel(file_path.replace(".xlsx", ".xlsx"), index=False)


def entity_alignment_filter(file_path: str):
    df = pd.read_excel(file_path)
    print(f"原文本长度:{len(df)}")
    df = df[df["SourceType_probabilities"] > 0.5][df["TargetType_probabilities"] > 0.5]
    print(f"剩余文本长度:{len(df)}")
    df.to_excel(file_path.replace(".xlsx", "_probabilities.xlsx"), index=False)

def llm_infer(prompt):
    rewrite_server_url = "http://127.0.0.1:7777/v1/completions"
    rewrite_server_headers = {
        'Content-Type': 'application/json'
    }
    rewrite_server_data = {
        'model': "/data/llm_weights/qwen/Qwen2___5-7B-Instruct-GPTQ-Int4",
        'prompt': prompt,
        'max_tokens': 1024,
        'temperature': 0,
        'stop': ["<|im_end|>"],
        # 'logprobs': 10
    }
    response = requests.post(rewrite_server_url, headers=rewrite_server_headers, data=json.dumps(rewrite_server_data))
    return response.json()['choices'][0]['text']
    # print(response.json())
    # return response.json()

def create_chain_via_llm(file_path):

    df = pd.read_excel(file_path)
    df = df[df["Target Type"]=="故障现象"][df["Source Type"]=="故障现象"]
    for des, s, t, w in zip(df["description"], df["Source"], df["Target"], df["weight"]):
        prompt = "<|im_start|>system
请阅读根据设备故障和诱发故障的描述,并根据该描述推理出“设备”、“故障”之间的依赖关系(依赖长度根据描述文本确定)。
" 
                 "“设备”、“故障”之间请用“->”标记
" 
                 "例如:
" 
                 "依赖描述:电网电压降可能导致外绝缘破损,影响发电机的运行状态。
" 
                 "“设备”、“故障”之间的依赖关系为:发电机->电网电压降->发电机->绝缘破损 <|im_end|>
" 
                 "<|im_start|>user
依赖描述:{des}
“设备”、“故障”之间的依赖关系为:<|im_end|><|im_start|>assistant
"
        print("*"*10)
        print(des, s, t)
        print(llm_infer(prompt))

def final_clear():
    substation_df = pd.read_excel("/data/substation_network_edges_0205_c_probabilities.xlsx")
    device_df = pd.read_excel("/data/device_level_determination_graph.xlsx")

    s_rel_fault_list_all = []
    s_rel_fault_des_list_all = []
    s_detection_tools_list_all_a = []
    s_detection_tools_des_list_all_a = []

    device_2_list_all_fa = []
    device_2_des_list_all_fa = []
    for s, h1 in tqdm(zip(device_df["source"], device_df["hop1"]), total=len(device_df)):
        current_df = substation_df[substation_df["Source"] == h1][substation_df["Target Type"] == "故障现象"][:10]
        if len(current_df) != 0:
            s_rel_fault_list = "
".join(list(current_df["Target"]))
            s_rel_fault_des_list = "
".join(list(current_df["description"]))
            s_detection_tools_list_all = []
            s_detection_tools_des_list_all = []

            for fault in s_rel_fault_list.split("
"):
                current_df = substation_df[substation_df["Source"] == fault][substation_df["Target Type"] == "检测工具"][:10]
                if len(current_df) != 0:
                    s_detection_tools_list = "
".join(list(current_df["Target"]))
                    s_detection_tools_des_list = "
".join(list(current_df["description"]))
                else:
                    s_detection_tools_list = ""
                    s_detection_tools_des_list = ""

                s_detection_tools_list_all.append(s_detection_tools_list)
                s_detection_tools_des_list_all.append(s_detection_tools_des_list)

            s_detection_tools_list_all_a.append("

".join(s_detection_tools_list_all).strip())
            s_detection_tools_des_list_all_a.append("

".join(s_detection_tools_des_list_all).strip())

            device_2_list_all = []
            device_2_des_list_all = []
            for fault in s_rel_fault_list.split("
"):
                current_df = substation_df[substation_df["Source"] == fault][substation_df["Target Type"] == "设备或部件名称"][:10]
                if len(current_df) != 0:
                    current_device_t1 = list(device_df[device_df["source"] == s]["hop1"][:10])
                    current_device_t1_map = []
                    current_device_t1_des_map = []
                    for i in current_device_t1:
                        cdf = current_df[current_df["Target"]==i]

                        if len(cdf)==1:
                            current_device_t1_map.append(i)
                            current_device_t1_des_map.append(list(cdf["description"])[0])
                        else:
                            current_device_t1_map.append("")
                            current_device_t1_des_map.append("")

                    device_2_list = "
".join(current_device_t1_map).strip()
                    device_2_des_list = "
".join(current_device_t1_des_map).strip()
                else:
                    device_2_list = ""
                    device_2_des_list = ""

                device_2_list_all.append(device_2_list)
                device_2_des_list_all.append(device_2_des_list)

            device_2_list_all_fa.append("

".join(device_2_list_all).strip())
            device_2_des_list_all_fa.append("

".join(device_2_des_list_all).strip())
        else:
            s_rel_fault_list = ""
            s_rel_fault_des_list = ""
            s_detection_tools_list_all_a.append("")
            s_detection_tools_des_list_all_a.append("")
            device_2_list_all_fa.append("")
            device_2_des_list_all_fa.append("")

        s_rel_fault_list_all.append(s_rel_fault_list)
        s_rel_fault_des_list_all.append(s_rel_fault_des_list)



    device_df["子设备相关故障"] = s_rel_fault_list_all
    device_df["子设备相关故障描述"] = s_rel_fault_des_list_all

    device_df["子类设备的相关故障所涉及的检测手段"] = s_detection_tools_list_all_a
    device_df["子类设备的相关故障所涉及的检测手段描述"] = s_detection_tools_des_list_all_a

    device_df["子类设备的相关故障所影响的其他设备(只关联该母设备下的设备)"] = device_2_list_all_fa
    device_df["子类设备的相关故障所影响的其他设备(只关联该母设备下的设备)描述"] = device_2_des_list_all_fa
    device_df.to_excel("/data/equipment_failure.xlsx", index=False)


if __name__ == "__main__":
    # 创建测试图
    working_dir = '/data/test_books/'

    G = nx.read_graphml(f'{working_dir}/graph_chunk_entity_relation.graphml')

    # 导出Excel
    networkx_to_excel(G,
                      f"{working_dir}/substation_network_nodes.xlsx",
                      f"{working_dir}/substation_network_edges.xlsx")

5. excel后处理

(1)输入:
        1)base_dir:知识图谱提取后的文件夹
        2)origin_book_path_list: 知识图谱提取对应的原书籍

        包含下述文件:

        ├── kv_store_text_chunks.json
        ├── kv_store_full_docs.json
        ├── substation_network_nodes.xlsx
        └── substation_network_edges.xlsx

(2)输出:
        1)kv_store_text_chunks_helpful.xlsx:对所有 chunk 做电力知识的初步过滤
        2)kv_store_text_chunks_final.xlsx:对 kv_store_text_chunks_helpful 的进一步过滤
        3)evaluation_dataset.xlsx:包含对每个 chunk 提问的表格
        4)交直流滤波器重要书籍确认.xlsx:所有用于提取知识图谱的书籍信息
        5)更新 substation_network_nodes.xlsx
        6)更新 substation_network_edges.xlsx

# 多步骤数据清洗和增强
post_process(base_dir=working_dir, origin_book_path_list=glob.glob(book_path))
_make_ann(working_dir, glob.glob(book_path))

 处理步骤包括:

文本块过滤:使用LLM判断文本块的有用性

实体对齐:验证抽取实体的类型准确性

关系验证:确保实体间关系的合理性

数据去重:移除重复的知识条目
知识总结和合并

完整代码:

### propositioning.py

# -*- coding: utf-8 -*-            
# @Author : lixiaoran
# @Time : 2025/3/8 09:43
import json
import requests
import pandas as pd
from tqdm import tqdm
import glob


def llm_infer(
        prompt_list,
        max_tokens
):
    response = requests.post("http://127.0.0.1:7789/v1/completions", json={
        "model": requests.get("http://127.0.0.1:7789/v1/models").json()["data"][0]["id"],
        "prompt": prompt_list,
        "stream": False,
        "top_k": 10,
        "top_p": 0.75,
        "stop": ["<|im_end|>"],
        "max_tokens": max_tokens,
        "temperature": 0.65}, headers={"Content-Type": "application/json"}, stream=False)
    return [i["text"].strip() for i in response.json()["choices"]]


proposal_indexing_system_prompt = """
将"内容"部分分解为清晰简单的命题,确保脱离上下文仍可理解。
1. 将复合句拆分为简单句,尽可能保留原文措辞
2. 对包含额外描述信息的命名实体,将其描述信息分离为独立命题
3. 通过添加必要修饰语和代称替换实现去语境化:
   - 为名词或整句添加修饰语
   - 将代词(如"它"、"他"、"她"、"他们"、"这"、"那")替换为所指对象的全称
4. 结果以字符串列表呈现,如: ["",...,""]

示例:

输入:
内容:关于复活节野兔(Osterhase)的最早记载由医学教授Georg Franck von Franckenau于1678年在德国西南部记录,但该证据直到18世纪才在德国其他地区为人所知。学者Richard Sermon指出"春季常见野兔出没花园,这可能是对藏彩蛋习俗的合理解释。另有一种欧洲传统认为野兔会产卵,因为野兔的抓痕/巢穴与田凫的巢穴十分相似,且两者都出现在春季的草地上。"19世纪复活节贺卡、玩具和书籍的流行使复活节野兔/家兔风靡欧洲。德国移民将此习俗传入英美后,逐渐演变为复活节兔子。

输出:
[
  "关于复活节野兔的最早证据由Georg Franck von Franckenau于1678年在德国西南部记录",
  "Georg Franck von Franckenau是医学教授",
  "该证据直到18世纪才在德国其他地区为人所知",
  "Richard Sermon是学者",
  "Richard Sermon提出了关于野兔与复活节传统关联的解释假说",
  "春季常见野兔出没花园",
  "野兔可能是对花园藏彩蛋习俗的合理解释",
  "存在野兔会产卵的欧洲传统",
  "野兔的抓痕/巢穴与田凫的巢穴十分相似",
  "野兔和田凫巢穴都出现在春季的草地上",
  "19世纪复活节贺卡、玩具和书籍的流行使复活节野兔/家兔风靡欧洲",
  "德国移民将复活节野兔/家兔习俗传入英美",
  "该习俗在英美演变为复活节兔子"
]
"""
proposal_indexing_user_prompt = "分解以下内容:
{chunk}"

rewrite_chunk_system_prompt = "请重新整理用户提供的段落文本(可能是几个字、一句话也、一个段落或者是一张表格都有可能),要求重写逻辑格式清晰易读,删除图片之类的影响阅读的冗余信息,保证输出的段落逻辑流畅,没有冗余信息,结构清晰,标题和子标题使用恰当,如有表格请解析成json格式。确保技术术语准确,没有遗漏重要信息,同时删除所有图片相关的标记和说明。注意:表格请解析成json格式
注意:生成后的文本会全部作为正式官网数据保存请确保不要有冗余信息!"

origin_chunk = "主运系统(极控以及相应的 VBE)检测到故障后,切换逻辑自动启动,切换到备用系统。如果备用系统也故障了,则切换逻辑会发出跳闸命令闭锁相应极。

极控系统同时还提供了直流保护功能,作为附加的直流后备保护。包括换流器故障保护、开路试验故障保护、触发角过大保护、直流线路永久故障快速闭锁。

![](27_3.png)

图 1-25 兴仁换流站直流站控冗余配置示意图

![](27_4.png)

图 1-27 自动选择运行原理图

情况可自由更换故障部分设备。

阀控系统的主要功能如下:

(1)产生触发脉冲分别触发相应的晶闸管。

(2)检测晶闸管及其相关设备的状态。

(3)在反向恢复期对晶闸管进行保护。

(4)产生阀电流过零关断信号发送至极控系统。

目前南方电网已投运的直流输电工程,晶闸管阀的触发方式主要有光电转换触发和光直接触发两种。对于采用光电转换触发的晶闸管阀,其阀控系统主要由阀基电子设备(VBE)、光纤、TE板(Thyristor Electronics Board)组成;对于采用光直接触发的晶闸管阀,其阀控系统主要由阀基电子设备(VBE)、光纤、光分配器(MSC)、RPU(Recovery-Protection-Unit Board)晶闸管电压监测(TVM)组成。

六、保护系统

换流站保护主要分交流系统保护、直流系统保护以及站用电系统保护三大部 分。全站保护构成见图 1-28。

全站保护共分 10 个区域,保护区域分区见图 1-29。

各区域保护介绍如下:

(一)交流母线保护

交流母线保护配置与常规交流变电站基本相同,配置见图 1-30("

deepseek_rewrite_chunk = """
直流输电系统冗余保护与控制系统架构说明

一、主系统冗余切换机制
当主运系统(含极控及VBE)检测到运行故障时,自动切换逻辑将立即启动,将控制权转移至备用系统。若备用系统同时发生故障,切换逻辑将发出跳闸指令实现极闭锁。

二、极控系统保护功能
作为直流后备保护的重要组成部分,极控系统配置以下保护机制:
1. 换流器故障保护
2. 开路试验故障保护
3. 触发角过载保护
4. 直流线路永久故障快速闭锁

三、阀控系统核心功能
1. 晶闸管触发控制
   - 生成触发脉冲驱动对应晶闸管
   - 发送阀电流过零关断信号至上级极控系统

2. 设备状态监测
   - 实时检测晶闸管及相关设备运行状态
   - 实施反向恢复期晶闸管保护

四、晶闸管触发技术方案
当前南方电网直流工程采用两种触发方式:
1. 光电转换触发系统架构
   - 阀基电子设备(VBE)
   - 光纤传输系统
   - TE电子控制板

2. 光直接触发系统架构
   - 阀基电子设备(VBE)
   - 光纤及光分配器(MSC)
   - 综合保护单元(RPU)
   - 晶闸管电压监测模块(TVM)

五、换流站保护体系
采用模块化分区设计,包含三大保护子系统:
(一)交流系统保护
(二)直流系统保护
(三)站用电系统保护

全站保护体系划分10个功能区域,各区域配置独立保护装置。其中交流母线保护区域配置与常规交流变电站保持兼容,确保系统间保护协调性。

注:系统设计支持故障模块在线更换功能,确保维护期间系统持续运行能力。
"""
rewrite_user_prompt = "现在请重写下面这个原文本的信息:
{chunk}
"

ann_prompt = """
你是个文本标注员,你只需要根据下面标注要求判断该文本是否需要,你需要判断用户提供内容是否需要即可。只需回复“需要”或者“不需要”
标注要求:
1.	实体名词是否有利用价值没利用价值直接舍弃(如:设备编号,故障号码的等)
2.	实体名词颗粒度过大或过小剔除或保留(如:智能电力系统,某型号螺丝等)
3.	实体关系的描述是否在文本上下文(chunk)中有叙述,没有叙述则舍弃
4.	实体关系的强相关性(比如很浅显的关系需要剔除:绝缘体生锈会损坏之类的)"""


def propositioning(
        text_chunks_excel: str,
        text_title: str
):
    df = pd.read_excel(text_chunks_excel)
    df = df[df["chunk_filter"] == "helpful"]
    batch_size = 512
    # to list prompt
    # prompt = f"<|im_start|>system
{proposal_indexing_system_prompt}<|im_end|>
" 
    #          f"<|im_start|>user
{proposal_indexing_user_prompt}

(只需回复一个结果列表不需要解释)<|im_end|><|im_start|>assistant
"
    # to str prompt
    # prompt = f"<|im_start|>system
{rewrite_chunk_system_prompt}
例如下面这个例子(可以仿照这个例子的逻辑整理):

原文本信息:

{origin_chunk}

重写后文本信息

{deepseek_rewrite_chunk}

{'*' * 20}
<|im_end|>
" 
    #          f"<|im_start|>user
{rewrite_user_prompt}
<|im_end|><|im_start|>assistant
重写后文本:"
    # to str prompt non-example
    prompt = f"<|im_start|>system
{rewrite_chunk_system_prompt}
<|im_end|>
" 
             f"<|im_start|>user
{rewrite_user_prompt}
<|im_end|><|im_start|>assistant
重写后文本:"
    rewrite_content = []
    for chunks_i in tqdm(range(0, len(df), batch_size), total=int(len(df) / batch_size)):
        batch_data = [prompt.format(chunk=i[text_title]) for _, i in df[chunks_i:chunks_i + batch_size].iterrows()]
        answer_list = llm_infer(prompt_list=batch_data, max_tokens=2048)
        rewrite_content.extend(answer_list)
    df["rewrite_content"] = [rc.replace("||", "").replace("|
|", "").replace("|  |", "") for rc in rewrite_content]
    df.to_excel(text_chunks_excel.replace(".xlsx", "_helpful.xlsx"), index=False)


def chunk_filter_by_llm(
        kv_store_text_chunks_path: str
):
    text_chunks = [{**{"id": k}, **v} for k, v in json.loads(open(kv_store_text_chunks_path, "r").read()).items()]
    df = pd.DataFrame(text_chunks)
    batch_size = 512
    prompt = "<|im_start|>system
请判断用户提供的上下文中是否有有用的知识存在,请仔细阅读思考作答,回复只需回复 “有” 或 “没有” 即可!|im_end|>
" 
             "<|im_start|>user
请判断该文本有没有电力学相关知识:
{text}
<|im_end|><|im_start|>assistant
"
    predicted_labels = []
    for i in tqdm(range(0, len(text_chunks), batch_size), total=round(len(text_chunks) / batch_size)):
        batch_input_sentences = [prompt.format(text=i["content"]) for i in text_chunks[i:i + batch_size]]
        predicted_label_list = llm_infer(prompt_list=batch_input_sentences, max_tokens=5)
        predicted_labels.extend(["null" if "没有" in i else "helpful" for i in predicted_label_list])
    df["chunk_filter"] = predicted_labels
    df.to_excel(kv_store_text_chunks_path.replace(".json", ".xlsx"), index=False)


def chunk_filter_by_set(
        kv_store_text_chunks_path: str,
        set_size: int
):
    text_chunks = [{**{"id": k}, **v} for k, v in json.loads(open(kv_store_text_chunks_path, "r").read()).items()]
    df = pd.DataFrame(text_chunks)
    df["chunk_filter"] = ["null" if len(set(c)) < set_size else "helpful" for c in df["content"]]
    df.to_excel(kv_store_text_chunks_path.replace(".json", "_helpful.xlsx"), index=False)


def create_book_name_by_llm(
        excel_path: str,
        text_title: str
):
    df = pd.read_excel(excel_path)
    batch_size = 512
    prompt = "<|im_start|>system
请根据用户提供的书籍片段内容生成该片段的书籍名称(注意!只需生成一个最符合的书籍名称即可无需生成其他内容),书籍名称请使用书名号“《》”包裹<|im_end|>
" 
             "<|im_start|>user
书籍片段内容如下:

{text}

<|im_end|><|im_start|>assistant
书籍名称为:"
    predicted_labels = []
    for i in tqdm(range(0, len(df), batch_size), total=round(len(df) / batch_size)):
        batch_input_sentences = [prompt.format(text=i[text_title]) for _, i in df[i:i + batch_size].iterrows()]
        predicted_label_list = llm_infer(prompt_list=batch_input_sentences, max_tokens=30)
        predicted_labels.extend(predicted_label_list)
    df[f"book_name"] = [i.replace("《", "").replace("》", " ").strip() for i in predicted_labels]
    df.to_excel(excel_path, index=False)


def create_book_name_by_origin_file_by_chunk(
        origin_book_path_list: list,
        doc_id_path: str,
        excel_path: str
):
    book_text_to_book_name = {open(i, "r").read().strip(): i.split("/")[-1].replace(".md","") for i in origin_book_path_list}
    book_text_to_doc_id = {v["content"]: k for k, v in json.loads(open(doc_id_path, "r").read()).items()}
    doc_id_to_book_name = {
        dic_id: book_text_to_book_name[book_text] if book_text in book_text_to_book_name.keys() else "国家电网公司直流换流站验收管理规定"
        for book_text, dic_id in book_text_to_doc_id.items()}
    df = pd.read_excel(excel_path)
    df[f"book_name_by_origin_file"] = [doc_id_to_book_name[i] for i in df["full_doc_id"]]
    df.to_excel(excel_path, index=False)

def create_book_name_by_origin_file_by_network(
        input_excel_path: str,
        output_excel_path: str,
        subset:str
):
    input_df = pd.read_excel(input_excel_path)
    mapper = {row["id"]:row["book_name_by_origin_file"] for i, row in input_df.iterrows()}

    out_df = pd.read_excel(output_excel_path)
    out_df[f"book_name_by_origin_file"] = ["国家电网公司直流换流站验收管理规定" if i.split("<SEP>")[0] not in mapper.keys() else mapper[i.split("<SEP>")[0]] for i in out_df[subset]]
    out_df.to_excel(output_excel_path, index=False)


def building_ready_data(
        kv_store_text_chunks_json: str,
        kv_store_text_chunks_excel_path: str,
        substation_network_nodes_path: str,
        substation_network_edges_path: str
):
    chunks_df = pd.read_excel(kv_store_text_chunks_excel_path)
    nodes_df = pd.read_excel(substation_network_nodes_path)
    edges_df = pd.read_excel(substation_network_edges_path)

    chunk_id_to_doc_id = {k: v["full_doc_id"] for k, v in
                          json.loads(open(kv_store_text_chunks_json, "r").read()).items()}

    doc_id_to_book_name = {i["full_doc_id"]: i["book_name_by_origin_file"] for _, i in chunks_df.iterrows()}

    chunks_df["text_type"] = ["text" for _, _, in chunks_df.iterrows()]
    nodes_df["text_type"] = ["text" for _, _, in nodes_df.iterrows()]
    edges_df["text_type"] = ["text" for _, _, in edges_df.iterrows()]

    nodes_df_book_name_by_origin_file = []
    for _, i, in nodes_df.iterrows():
        if i["source_id"].split("<SEP>")[0] in chunk_id_to_doc_id.keys() and chunk_id_to_doc_id[
            i["source_id"].split("<SEP>")[0]] in doc_id_to_book_name.keys():
            nodes_df_book_name_by_origin_file.append(
                doc_id_to_book_name[chunk_id_to_doc_id[i["source_id"].split("<SEP>")[0]]])
        else:
            nodes_df_book_name_by_origin_file.append("电力知识手册")
    nodes_df["book_name_by_origin_file"] = nodes_df_book_name_by_origin_file

    edges_df_book_name_by_origin_file = []
    for _, i, in edges_df.iterrows():
        if i["source_id"].split("<SEP>")[0] in chunk_id_to_doc_id.keys() and chunk_id_to_doc_id[
            i["source_id"].split("<SEP>")[0]] in doc_id_to_book_name.keys():
            edges_df_book_name_by_origin_file.append(
                doc_id_to_book_name[chunk_id_to_doc_id[i["source_id"].split("<SEP>")[0]]])
        else:
            edges_df_book_name_by_origin_file.append("电力知识手册")
    edges_df["book_name_by_origin_file"] = edges_df_book_name_by_origin_file

    chunks_df["indices"] = [i["chunk_order_index"] for _, i, in chunks_df.iterrows()]
    nodes_df["indices"] = [0 for _, i, in nodes_df.iterrows()]
    edges_df["indices"] = [i["weight"] for _, i, in edges_df.iterrows()]

    chunks_df.to_excel(kv_store_text_chunks_excel_path, index=False)
    nodes_df.to_excel(substation_network_nodes_path, index=False)
    edges_df.to_excel(substation_network_edges_path, index=False)


def main(base_dir, origin_book_path_list):
    # 上下文文本过滤
    print(" 上下文文本过滤 ".center(50, "="))
    # chunk_filter_by_llm(kv_store_text_chunks_path=f"{base_dir}/kv_store_text_chunks.json")
    chunk_filter_by_set(kv_store_text_chunks_path=f"{base_dir}/kv_store_text_chunks.json", set_size=10)

    # 重新誊写上下文文本
    # print(" 重新誊写上下文文本 ".center(50, "="))
    # propositioning(
    #     text_chunks_excel=f"{base_dir}/kv_store_text_chunks.xlsx",
    #     text_title="content"
    # )
    # 为上下文构造名称与段落类型
    # print(" 为上下文构造名称与段落类型 ".center(50, "="))
    # create_book_name_by_llm(
    #     excel_path=f"{base_dir}/kv_store_text_chunks_elec.xlsx",
    #     text_title="content"
    # )
    
    create_book_name_by_origin_file_by_chunk(
        origin_book_path_list=origin_book_path_list,
        doc_id_path=f"{base_dir}/kv_store_full_docs.json",
        excel_path=f"{base_dir}/kv_store_text_chunks_helpful.xlsx"
    )
    create_book_name_by_origin_file_by_network(
        input_excel_path=f"{base_dir}/kv_store_text_chunks_helpful.xlsx",
        output_excel_path=f"{base_dir}/substation_network_nodes.xlsx",
        subset="source_id"
    )
    create_book_name_by_origin_file_by_network(
        input_excel_path=f"{base_dir}/kv_store_text_chunks_helpful.xlsx",
        output_excel_path=f"{base_dir}/substation_network_edges.xlsx",
        subset="source_id"
    )

    # 制作准备入库数据
    print(" 制作准备入库数据 ".center(50, "="))
    building_ready_data(
        kv_store_text_chunks_json=f"{base_dir}/kv_store_text_chunks.json",
        kv_store_text_chunks_excel_path=f"{base_dir}/kv_store_text_chunks_helpful.xlsx",
        substation_network_nodes_path=f"{base_dir}/substation_network_nodes.xlsx",
        substation_network_edges_path=f"{base_dir}/substation_network_edges.xlsx"
    )

def post_process(base_dir, origin_book_path_list):
    # 上下文文本过滤
    print(" 上下文文本过滤 ".center(50, "="))
    # chunk_filter_by_llm(kv_store_text_chunks_path=f"{base_dir}/kv_store_text_chunks.json")
    chunk_filter_by_set(kv_store_text_chunks_path=f"{base_dir}/kv_store_text_chunks.json", set_size=10)

    # 重新誊写上下文文本
    # print(" 重新誊写上下文文本 ".center(50, "="))
    # propositioning(
    #     text_chunks_excel=f"{base_dir}/kv_store_text_chunks.xlsx",
    #     text_title="content"
    # )
    # 为上下文构造名称与段落类型
    # print(" 为上下文构造名称与段落类型 ".center(50, "="))
    # create_book_name_by_llm(
    #     excel_path=f"{base_dir}/kv_store_text_chunks_elec.xlsx",
    #     text_title="content"
    # )
    create_book_name_by_origin_file_by_chunk(
        origin_book_path_list=origin_book_path_list,
        doc_id_path=f"{base_dir}/kv_store_full_docs.json",
        excel_path=f"{base_dir}/kv_store_text_chunks_helpful.xlsx"
    )
    create_book_name_by_origin_file_by_network(
        input_excel_path=f"{base_dir}/kv_store_text_chunks_helpful.xlsx",
        output_excel_path=f"{base_dir}/substation_network_nodes.xlsx",
        subset="source_id"
    )
    create_book_name_by_origin_file_by_network(
        input_excel_path=f"{base_dir}/kv_store_text_chunks_helpful.xlsx",
        output_excel_path=f"{base_dir}/substation_network_edges.xlsx",
        subset="source_id"
    )

    # 制作准备入库数据
    print(" 制作准备入库数据 ".center(50, "="))
    building_ready_data(
        kv_store_text_chunks_json=f"{base_dir}/kv_store_text_chunks.json",
        kv_store_text_chunks_excel_path=f"{base_dir}/kv_store_text_chunks_helpful.xlsx",
        substation_network_nodes_path=f"{base_dir}/substation_network_nodes.xlsx",
        substation_network_edges_path=f"{base_dir}/substation_network_edges.xlsx"
    )



def annotation_data_preparation():
    # 武汉交直流
    base_dir = "/data3/workspace-lxr/KGs/qa_wh_acdc_lightrag_kg_qwen2.5_72b_int4"
    chunks_df = pd.read_excel(f"{base_dir}/kv_store_text_chunks.xlsx")
    edges_df = pd.read_excel(f"{base_dir}/substation_network_edges.xlsx")
    nodes_df = pd.read_excel(f"{base_dir}/substation_network_nodes.xlsx")
    chunk_id_to_chunk_text = {i["id"]: {
        "content": i["content"],
        "rewrite_content": i["rewrite_content"],
        "full_doc_id": i["full_doc_id"]
    } for _, i in chunks_df.iterrows()}

    edges_df["content"] = [chunk_id_to_chunk_text[i]["content"] if i in chunk_id_to_chunk_text else "" for i in
                           edges_df["source_id"]]
    nodes_df["content"] = [chunk_id_to_chunk_text[i]["content"] if i in chunk_id_to_chunk_text else "" for i in
                           nodes_df["source_id"]]

    edges_df["rewrite_content"] = [chunk_id_to_chunk_text[i]["rewrite_content"] if i in chunk_id_to_chunk_text else ""
                                   for i in edges_df["source_id"]]
    nodes_df["rewrite_content"] = [chunk_id_to_chunk_text[i]["rewrite_content"] if i in chunk_id_to_chunk_text else ""
                                   for i in nodes_df["source_id"]]

    edges_df["full_doc_id"] = [chunk_id_to_chunk_text[i]["full_doc_id"] if i in chunk_id_to_chunk_text else "" for i in
                               edges_df["source_id"]]
    nodes_df["full_doc_id"] = [chunk_id_to_chunk_text[i]["full_doc_id"] if i in chunk_id_to_chunk_text else "" for i in
                               nodes_df["source_id"]]

    edges_df.to_excel(f"{base_dir}/substation_network_edges_all.xlsx", index=False)
    nodes_df.to_excel(f"{base_dir}/substation_network_nodes_all.xlsx", index=False)


import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
import random


def select_diverse_texts(texts, num_to_select=50):
    """
    使用TF-IDF和最大差异策略选择代表性文本

    参数:
    texts - 文本列表(需要去重的原始文本)
    num_to_select - 需要保留的文本数量

    返回:
    筛选后的文本列表
    """
    # 使用TF-IDF进行文本向量化
    vectorizer = TfidfVectorizer(stop_words='english', max_features=10000)
    tfidf_matrix = vectorizer.fit_transform(texts)

    # 初始化索引集合
    n = len(texts)
    if n <= num_to_select:
        return texts

    selected = []
    remaining = list(range(n))

    # 随机选择第一个样本
    initial_idx = random.choice(remaining)
    selected.append(initial_idx)
    remaining.remove(initial_idx)

    # 初始化相似度跟踪
    current_vector = tfidf_matrix[initial_idx]
    similarity_scores = current_vector.dot(tfidf_matrix[remaining].T).toarray().flatten()

    while len(selected) < num_to_select and remaining:
        # 选择差异最大的样本
        min_idx = np.argmin(similarity_scores)
        selected_idx = remaining[min_idx]
        selected.append(selected_idx)

        # 更新剩余样本索引
        del remaining[min_idx]

        # 计算新样本与剩余样本的相似度
        new_vector = tfidf_matrix[selected_idx]
        new_similarities = new_vector.dot(tfidf_matrix[remaining].T).toarray().flatten()

        # 更新相似度跟踪(取历史最大值)
        similarity_scores = np.delete(similarity_scores, min_idx)
        similarity_scores = np.maximum(similarity_scores, new_similarities)

    return [texts[i] for i in selected], selected


def annotation_data_preparation():
    base_dir = "/data3/qa_wh_acdc_lightrag_kg_qwen2.5_72b_int4"
    edges_dir = f"{base_dir}/substation_network_edges_all.xlsx"
    nodes_dir = f"{base_dir}/substation_network_nodes_all.xlsx"
    edges_df = pd.read_excel(edges_dir)
    nodes_df = pd.read_excel(nodes_dir)

    _, edges_indexes = select_diverse_texts(
        [f"{i['Source']}{i['Target']}{i['description']}" for _, i in edges_df.iterrows()], num_to_select=50)
    _, nodes_indexes = select_diverse_texts([f"{i['Node ID']}{i['description']}" for _, i in nodes_df.iterrows()],
                                            num_to_select=50)
    edges_df["ann"] = ["check" if i in edges_indexes else "uncheck" for i in range(len(edges_df))]
    print("a")
    nodes_df["ann"] = ["check" if i in nodes_indexes else "uncheck" for i in range(len(nodes_df))]
    edges_df.to_excel(edges_dir, index=False)
    nodes_df.to_excel(nodes_dir, index=False)


def error_case_analysis():
    file_path = "/data3/qa_wh_acdc_lightrag_kg_qwen2.5_72b_int4/大模型问答测试问题清单.xlsx"
    df = pd.read_excel(file_path)
    log_data = [i.split("
[seqseqseq]
") for i in
                open(
                    "/data3/qa_wh_acdc_lightrag_kg_qwen2.5_72b_int4/test_0320_5.txt").read().split(
                    "
[endendend]
") if len(i.split("
[seqseqseq]
")) == 2]
    df["大模型回答_0321_2"] = [i for i, _ in log_data]
    df["书籍上下文_0321_2"] = [i for _, i in log_data]
    df.to_excel(file_path, index=False)
    # 说明交流滤波器投运前需要检查的50个项目


def book_summary(file_path):
    bz = 256

    df = pd.read_excel(file_path)
    prompt = "<|im_start|>system
请阅读并参考用户提供的书籍内容片段对该书籍的内容进行摘要总结,总结不要超过500字,总结前请先复述一遍书名最好列再出目次<|im_end|>
" 
             "<|im_start|>user
书籍名称:
{bookname}

书籍内容:
{context}

请对书籍进行摘要<|im_end|><|im_start|>assistant
书籍摘要总结:
"
    predicted_labels = []
    for i in tqdm(range(0, len(df), bz), total=round(len(df) / bz)):
        batch_input_sentences = [prompt.format(bookname=n, context=c[:5000] if type(c) == str else "空") for n, c in
                                 zip(list(df["book_name"])[i:i + bz], list(df["context"])[i:i + bz])]
        predicted_label_list = llm_infer(prompt_list=batch_input_sentences, max_tokens=1024)
        predicted_labels.extend(predicted_label_list)
        # print("predicted_label_list",predicted_label_list)
    df["summary"] = predicted_labels
    df.to_excel(file_path, index=False)


def chunk_summary(file_path):
    df = pd.read_excel(file_path)

    bz = 256
    prompt = "<|im_start|>system
阅读用户提供的段落。你的任务是生成一个对下一段文本的总结(一句话)以能够放在下段文本的头部补充缺失信息。<|im_end|>
" 
             "<|im_start|>user
追溯上文参考(少许overlap):
{context_1}



下一段文本信息:

{context_2}

<|im_end|><|im_start|>assistant
"
    predicted_labels = [""]

    content_1_list = list(df["content"])[:-1]
    content_2_list = list(df["content"])[1:]

    for i in tqdm(range(0, len(content_1_list), bz), total=round(len(content_1_list) / bz)):
        batch_input_sentences = [prompt.format(context_1=c_1, context_2=c_2) for c_1, c_2 in
                                 zip(content_1_list[i:i + bz], content_2_list[i:i + bz])]
        predicted_label_list = llm_infer(prompt_list=batch_input_sentences, max_tokens=1024)
        predicted_labels.extend(predicted_label_list)
        # print("

content_1

".center(50, "#"))
        # print(content_1_list[i:i + bz])
        #
        # print("

predicted_label_list

".center(50, "#"))
        # print(predicted_label_list)
        #
        #
        # print("

content_2

".center(50, "#"))
        # print(content_2_list[i:i + bz])

    df["chunk_summary"] = predicted_labels
    df.to_excel(file_path, index=False)


def concat(origin_file_path, add_file_path):
    from transformers import AutoTokenizer
    """
    id	tokens	content	chunk_order_index	full_doc_id	chunk_filter	book_name_by_origin_file	text_type	indices	chunk_summary
    """
    origin_df = pd.read_excel(origin_file_path)
    add_df = pd.read_excel(add_file_path)
    tokenizer = AutoTokenizer.from_pretrained('/data/bce/bce-embedding-base_v1/')

    add_list = []
    for i, row in tqdm(add_df.iterrows(), total=len(add_df)):
        try:
            current_full_doc_id = origin_df[origin_df["book_name_by_origin_file"] == row["book_name"]]["full_doc_id"].tolist()[0]
            current_chunk_id = origin_df[origin_df["book_name_by_origin_file"] == row["book_name"]]["id"].tolist()[0]
        except Exception as e:
            continue

        add_list.append({
            "id": current_chunk_id,
            "tokens": len(tokenizer(row["summary"])['input_ids']) - 2,
            "content": row["summary"],
            "chunk_order_index": -1,
            "full_doc_id": current_full_doc_id,
            "chunk_filter": "helpful",
            "book_name_by_origin_file": row["book_name"],
            "text_type": "summary",
            "indices": -1,
            "chunk_summary": "",
        })

    out_df = pd.concat([origin_df, pd.DataFrame(add_list)])

    out_df["content"] = [f"{s}
{c}" if type(s) == str else f"{c}" for c, s in
                         zip(out_df["content"], out_df["chunk_summary"])]
    out_df.to_excel(origin_file_path.replace("_helpful.xlsx", "_final.xlsx"), index=False)


def _make_ann(base_dir, origin_book_path_list): 
    import pandas as pd
    book_info = [[i.split("/")[-1].replace(".md", ""), open(i).read(), len(open(i).read())] for i in origin_book_path_list]
    df = pd.DataFrame(book_info, columns=["book_name", "context", "context_length"])
    df.to_excel(f"{base_dir}/交直流滤波器重要书籍确认.xlsx", index=False)


def make_evaluation_dataset(base_dir):
    df = pd.read_excel(f"{base_dir}/kv_store_text_chunks_final.xlsx")

    bz = 256
    prompt = "<|im_start|>system
请根据用户提供的文本片段生成一个问题,该问题的解答一定在用户提供的文本片段中找到,只需生成一个问题即可不需要解答该问题!<|im_end|>
" 
             "<|im_start|>user
文本片段:
{context}

<|im_end|><|im_start|>assistant
问题:"
    predicted_labels = []

    df = df[df["chunk_order_index"] > 10]

    content_list = list(df["content"])

    for i in tqdm(range(0, len(content_list), bz), total=round(len(content_list) / bz)):
        batch_input_sentences = [prompt.format(context=c) for c in content_list[i:i + bz]]
        predicted_label_list = llm_infer(prompt_list=batch_input_sentences, max_tokens=512)
        predicted_labels.extend(predicted_label_list)
        # print("

content

".center(50, "#"))
        # print(content_list[i:i + bz])
        # print("

predicted_label_list

".center(50, "#"))
        # print(predicted_label_list)

    df["created_query_by_chunk"] = predicted_labels
    df.to_excel(f"{base_dir}/evaluation_dataset.xlsx", index=False)


def add_book_level(book_level_book_path, origin_file_path):
    book_level_df = pd.read_excel(book_level_book_path)
    origin_file_df = pd.read_excel(origin_file_path)
    book_name_to_level = {row["book_name"]:row["level"] for i, row in book_level_df.iterrows()}
    # origin_file_df["level"] = [book_name_to_level[row["book_name_by_origin_file"]] for i, row in origin_file_df.iterrows()]
    origin_file_df["level"] = [1 if row["book_name_by_origin_file"] not in book_name_to_level else book_name_to_level[
        row["book_name_by_origin_file"]] for i, row in origin_file_df.iterrows()]
    origin_file_df.to_excel(origin_file_path, index=False)



if __name__ == "__main__":
    # 武汉交直流
    base_dir = "/data/test_books/"
    origin_book_path_list = glob.glob("/home/markdown_new_books/*.md")

    main(base_dir, origin_book_path_list)

    _make_ann(base_dir, origin_book_path_list)

    book_summary(file_path=f"{base_dir}/交直流滤波器重要书籍确认.xlsx")
    chunk_summary(file_path=f"{base_dir}/kv_store_text_chunks_helpful.xlsx")

    concat(origin_file_path=f"{base_dir}/kv_store_text_chunks_helpful.xlsx",
           add_file_path=f"{base_dir}/交直流滤波器重要书籍确认.xlsx")



    """
    CUDA_VISIBLE_DEVICES=5 nohup vllm serve /data1/workspace-lxr/pretrained_model_weights/llm_weights/Qwen/QwQ-32B-AWQ 
            --chat-template /data1/workspace-lxr/vllm-main/examples/tool_chat_template_hermes.jinja 
            --enable-auto-tool-choice --tool-call-parser hermes 
            --gpu-memory-utilization 0.9 
            --max-num-seqs 512 
            --port 7772 
            --device cuda 
            --max-model-len 30000 
            --uvicorn-log-level error 
            --quantization awq_marlin > qwq_32b_awq_marlin_hermes.log &

    """

6. Milvus向量数据库录入

(1)输入:
        1)input_excel_path:需要导入 milvus 数据库的 excel 表格路径

        (kv_store_text_chunks_final.xlsx文件)
        2)current_table:向量数据库 collection 名称

(2)输出:
        1)input_excel_path 对应的数据存储到 milvus 数据集库

# 创建Milvus集合并插入数据
ic.insert_collection(
    current_table=collection_name,
    chunk_text_subset_name="content",
    # ... 其他参数
)

 存储特性:

使用BCE嵌入模型生成768维向量

结合BM25稀疏向量进行混合检索

支持多字段索引:书名、内容类型、文本块等

实现向量相似度和关键词匹配的双重检索

完整代码:

### write_col.py

import os

os.environ["TOKENIZERS_PARALLELISM"] = "false"
import glob
# from datasketch import MinHash, MinHashLSH
from tqdm import tqdm
import os
import jieba
import re
import pandas as pd
# from pymilvus import (
#     FieldSchema, CollectionSchema, DataType,
#     Collection, connections,
# )
import pickle
from pymilvus.model.sparse.bm25.tokenizers import build_default_analyzer
from pymilvus.model.sparse import BM25EmbeddingFunction
from pymilvus import MilvusClient, DataType, Function, FunctionType
import numpy as np
from scipy.sparse import csr_matrix
from BCEmbedding import EmbeddingModel


def remove_duplicates(df, column_name, threshold=0.9, num_perm=512):
    regex = re.compile(",|。")

    def split_word(sentence):
        sentence = str(sentence)
        return [word for word in jieba.lcut(re.sub(regex, '', sentence)) if word.strip()]

    lsh = MinHashLSH(threshold=threshold, num_perm=num_perm)
    unique_indices = set()
    minhashes = []
    for idx, row in df.iterrows():
        sentence_lcut = split_word(row[column_name])
        minhash = MinHash(num_perm=num_perm)
        minhash.update_batch([word.encode('utf-8') for word in sentence_lcut])

        # 查询是否已有相似的记录
        result = lsh.query(minhash)
        if not result:  # 如果没有相似的记录
            # print(idx, minhash)
            lsh.insert(idx, minhash)
            unique_indices.add(idx)
            minhashes.append(minhash)
    return df.loc[list(unique_indices)].reset_index(drop=True)


# class SimpleChineseTokenizer():
#     def __init__(self):
#         pass
#
#     def tokenize(self, text: str):
#         return list(text)
class insertCollection:
    def __init__(self, device: str, embedding_model_path: str):
        self.embedding_model = EmbeddingModel(model_name_or_path=embedding_model_path, 
                                              use_fp16=True, 
                                              device=device)
        # self.rerank_model = EmbeddingModel(model_name_or_path=rerank_model_path, use_fp16=True, device=device)

        # with open('/data1/workspace-lxr/data/book_excel_formal/book_info_0703.json', 'r') as file:
        #     self.name_to_intro = json.load(file)

        # connections.connect("default", host=host, port=port, user=username, password=password)
        self.client = MilvusClient(uri=f"http://{host}:{port}")

    def _create_collection(self, current_table: str):
        # connections.connect("default", host=host,
        #                     port=port, user=username, password=password)
        schema = self.client.create_schema(enable_dynamic_field = True)

        schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True, auto_id=True)
        schema.add_field(field_name="book_name", datatype=DataType.VARCHAR, max_length=256)
        schema.add_field(field_name="indices", datatype=DataType.VARCHAR, max_length=4096)
        schema.add_field(field_name="types", datatype=DataType.VARCHAR, max_length=4096)
        schema.add_field(field_name="chunk_text", datatype=DataType.VARCHAR, max_length=40000, enable_analyzer=True, enable_match=True)
        schema.add_field(field_name="embeddings", datatype=DataType.FLOAT16_VECTOR, dim=768)
        schema.add_field(field_name="book_level", datatype=DataType.VARCHAR, max_length=16)
        schema.add_field(field_name="bm25_vector", datatype=DataType.SPARSE_FLOAT_VECTOR)

        bm25_function = Function(
            name="text_bm25_emb",  # Function name
            input_field_names=["chunk_text"],  # Name of the VARCHAR field containing raw text data
            output_field_names=["bm25_vector"],
            # Name of the SPARSE_FLOAT_VECTOR field reserved to store generated embeddings
            function_type=FunctionType.BM25,  # Set to `BM25`
        )

        schema.add_function(bm25_function)


        self.client.create_collection(
            collection_name=current_table,
            schema=schema,
            enable_analyzer=True,
            enable_dynamic_field = True
        )
        self._add_index(collection_name=current_table)
        # self.client.flush()
        print(f"process done at {current_table}")


    def _add_index(self, collection_name):
        index_params = self.client.prepare_index_params()

        index_params.add_index(
            field_name="bm25_vector",
            index_name="bm25_vector_index",
            index_type="SPARSE_INVERTED_INDEX",  # Inverted index type for sparse vectors
            metric_type="BM25",
            params={
                "inverted_index_algo": "DAAT_MAXSCORE",
                # Algorithm for building and querying the index. Valid values: DAAT_MAXSCORE, DAAT_WAND, TAAT_NAIVE.
                "bm25_k1": 1.2,
                "bm25_b": 0.75
            },
        )

        index_params.add_index(
            field_name="embeddings",
            index_name="embeddings_index",
            index_type="FLAT",
            metric_type="COSINE"
        )

        index_params.add_index(field_name="book_name", index_name="book_name_index")
        index_params.add_index(field_name="indices", index_name="indices_index")
        index_params.add_index(field_name="types", index_name="types_index")
        index_params.add_index(field_name="chunk_text", index_name="chunk_text_index")
        index_params.add_index(field_name="book_level", index_name="book_level_index")
        index_params.add_index(field_name="chunk_text", index_name="chunk_text_index")

        self.client.create_index(
            collection_name=collection_name,  # Specify the collection name
            index_params=index_params
        )


    def insert_collection(
            self,
            current_table: str,
            chunk_text_subset_name: str,
            book_name_subset_name: str,
            indices_subset_name: str,
            book_level_subset_name: str,
            types_subset_name: str,
            input_excel_path: str
    ):
        print(current_table.center(40, "$"))
        self._create_collection(current_table=current_table)
        current_type_books_df = pd.read_excel(input_excel_path)
        print(f"start current_type_books_df : {len(current_type_books_df)}")
        # current_type_books_df = current_type_books_df.replace("", np.nan).dropna()
        # print(f"drop nan current_type_books_df : {len(current_type_books_df)}")
        current_type_books_df[chunk_text_subset_name] = current_type_books_df[chunk_text_subset_name].astype(str)
        current_type_books_df = current_type_books_df.drop_duplicates(subset=[chunk_text_subset_name])
        print(f"deduplication :{len(current_type_books_df)}")
        

        chunks = np.array_split(current_type_books_df, np.ceil(len(current_type_books_df) / 500))
        for i, chunk in tqdm(enumerate(chunks), total=len(chunks)):
            chunk_embeddings = self.embedding_model.encode(list(chunk[chunk_text_subset_name]), enable_tqdm=False)
            # chunk_bm25_ef_embedding = bm25_ef.encode_documents(list(chunk[chunk_text_subset_name]))

            entities = []
            for row_index,(id, book_name, indices, types, chunk_text, embeddings, book_level) in enumerate(zip(
                    [i for i in range(len(chunk[chunk_text_subset_name]))],
                    chunk[book_name_subset_name],
                    [str(i) for i in chunk[indices_subset_name]],
                    chunk[types_subset_name],
                    chunk[chunk_text_subset_name],
                    chunk_embeddings,
                    ['1' for _ in range(len(chunk[chunk_text_subset_name]))]
                    # [str(i) for i in chunk[book_level_subset_name]]
            )):

                entities.append({
                    # "id": id,
                    "book_name": book_name,
                    "indices": indices,
                    "types": types,
                    "chunk_text": chunk_text,
                    "embeddings": embeddings,
                    "book_level": book_level,
                    # "bm25_vector": sparse_dict
                })

            try:
                self.client.insert(current_table, entities)
                # self.client.flush()
            except Exception as e:
                print(e)
                # print(list(chunk[chunk_text_subset_name]))
                break
        print("done")


if __name__ == "__main__":
    host = "162.21.248.32"  # 127.0.0.1:19530
    # host = "127.0.0.1"  # 127.0.0.1:19530 
    port = 19530
    username = ""
    password = ""

    ic = insertCollection(device="cuda:6", embedding_model_path="/data3/bce/bce-embedding-base_v1")

    working_dir = '/data3/test_books/'
    current_table = "test_chunk_v1"

    input_excel_path = f"{working_dir}/kv_store_text_chunks_final.xlsx"

    ic.insert_collection(
        current_table=current_table,
        chunk_text_subset_name="content",
        book_name_subset_name="book_name_by_origin_file",  # book_name_by_origin_file
        indices_subset_name="indices",
        types_subset_name="text_type",
        book_level_subset_name="level",
        input_excel_path=input_excel_path,
    )
    ic._add_index(collection_name=current_table)

  

7. 数据库检索

(1)输入:
        1)queries:需要检索的 query
        2)collection_name:milvus collection 的名称
        3)limit:检索数据的条目

(2)输出:
        1)entities.xlsx:提取出的实体
        2)rlations.xlsx:提取出的关系

这个模块实现了一个混合检索系统,主要包括:

BM25关键词搜索 + 向量语义搜索 + 重排序模型的三层检索架构
倒数排名融合(RRF)算法优化搜索结果
跨集合搜索支持多个知识库同时查询 

完整代码:

### hybrid_search.py

# -*- coding: utf-8 -*-
import os

os.environ["TOKENIZERS_PARALLELISM"] = "false"
# from alg_common.loger_config import logger, decorate_all_methods
from BCEmbedding import EmbeddingModel, RerankerModel
from pymilvus import MilvusClient, DataType, Function, FunctionType
import time
import numpy as np


def sort_and_track_indices(lst):
    indexed_lst = list(enumerate(lst))
    sorted_lst = sorted(indexed_lst, key=lambda x: x[1], reverse=True)
    sorted_indices = [index for index, value in sorted_lst]
    return sorted_indices


def reciprocal_rank_fusion(rank_list1, rank_list2, k=60):
    """
    Combine two ranking lists using Reciprocal Rank Fusion (RRF).

    Parameters:
        rank_list1 (list of int): The indices of items in the first ranking list.
        rank_list2 (list of int): The indices of items in the second ranking list.

    Returns:
        list of int: A new ranking list sorted according to the combined RRF score.
    """
    # Initialize dictionary to hold reciprocal ranks
    rrf_scores = {}

    # K value in the RRF formula, often set to 60

    # Calculate RRF score for the first ranking list
    for rank, item in enumerate(rank_list1, start=1):
        rrf_scores[item] = rrf_scores.get(item, 0) + 1 / (k + rank)

    # Calculate RRF score for the second ranking list
    for rank, item in enumerate(rank_list2, start=1):
        rrf_scores[item] = rrf_scores.get(item, 0) + 1 / (k + rank)

    # Sort items based on their RRF scores in descending order
    sorted_items = sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)

    # Return the list of items sorted by RRF score
    return [item[0] for item in sorted_items]


# @decorate_all_methods
class HybridSearch:

    def __init__(self, embedding_model_path: str, rerank_model_path: str, books: list, dbname="default", host='0.0.0.0',
                 port=19530, user="", password="", device="cuda:0", collection_books={}, book_name_same_threshold=0.9):
        # host="127.0.0.1"
        # host = "192.168.6.212"
        # port = 19530

        """
        向量数据库混排搜索模型
        :param embedding_model_path: BCE Embedding 模型
        :param rerank_model_path: BCE 再排序 Embedding 模型
        :param basic_electricity_books_bm25_ef_path: 基础电力BM25索引
        :param qinghai_oil_water_system_books_bm25_ef_path: 青海油水BM25索引
        """

        self.books = {}
        self.embedding_model = EmbeddingModel(model_name_or_path=embedding_model_path, use_fp16=True, device=device)
        self.rerank_model = RerankerModel(model_name_or_path=rerank_model_path, use_fp16=True, device=device)
        self.lazy_initialization()  # 在embedding模型加载后执行一次
        self.dbname = dbname
        self.book_name_same_threshold = book_name_same_threshold
        self.collection_books = collection_books
        try:
            self.client = MilvusClient(uri=f"http://{host}:{port}")
        except Exception as e:
            print(f"hybrid_search error {e}")

    def lazy_initialization(self):
        try:
            lazy_initialization_start_time = time.time()
            self.embedding_model.encode(sentences=["lazy initialization test text"], enable_tqdm=False)
            self.rerank_model.compute_score([["lazy initialization test text", "lazy initialization test text"]],
                                            enable_tqdm=False)
            print(f"Embedding model lazy initialization time: {time.time() - lazy_initialization_start_time}秒")
        except Exception as e:
            print(f"Error for embedding model lazy initialization {e}..")

    def _search(self, text_list, collection, vector_field, limit, book_level:int=1):
        search_params = {
            'params': {'drop_ratio_search': 0.2},  # Proportion of small vector values to ignore during the search
        }

        return self.client.query(
            collection_name=collection,
            filter=f'book_level == "{book_level}"',
            data=text_list,
            anns_field=vector_field,
            limit=limit,
            search_params=search_params,
            output_fields=["indices", "chunk_text", "indices", "book_name", "types", "embeddings"]
        )


    def run(self, queries, collection_name, limit: int, book_level:int=1):

        """
        混排搜索forward函数
        :param queries: 需要查询的问题列表
        :param collection_name: 数据库名称,当前可用书籍库可查看:https://gitlab.aseit.cn/xiaoran.li/vector_database
        :param limit: 每个问题返回 Top limit 个相似度文本
        :return:
        """
        queries_bce_embeddings = self.embedding_model.encode(sentences=queries, enable_tqdm=False)

        def _single_collection_search(queries_bce_embeddings, collection_name):
            # 用 BM25 搜索
            top_n_items_by_bm25 = self._search(text_list=queries, collection=collection_name,
                                               vector_field="bm25_vector", limit=2000, book_level=book_level)
            bm_25_chunk_text = [i["chunk_text"] for i in top_n_items_by_bm25]
            embeddings_by_bm_25_rank = np.array([np.frombuffer(i["embeddings"][0], dtype=np.float16) for i in top_n_items_by_bm25])
            # 计算 bce embedding 和 bm25 embedding 的相似度
            embedding_score_by_bm_25_rank = (queries_bce_embeddings @ embeddings_by_bm_25_rank.T)[0]
            # 对相似度进行排序
            embedding_score_by_bm_25_rank = sort_and_track_indices(embedding_score_by_bm_25_rank)
            # 结合 RRF 进行排序
            helpful_embedding_score_by_bm_25_rank_index = reciprocal_rank_fusion(embedding_score_by_bm_25_rank,
                                                                                 [i for i in
                                                                                  range(len(embeddings_by_bm_25_rank))],
                                                                                 k=60)[:500]
            rerank_chunk_text = [bm_25_chunk_text[i] for i in helpful_embedding_score_by_bm_25_rank_index]
            rerank_chunk_text_pairs = [[queries[0], passage] for passage in rerank_chunk_text]

            # 用 rerank 模型计算文本对分数
            rerank_chunk_text_pairs_scores = self.rerank_model.compute_score(rerank_chunk_text_pairs, enable_tqdm=False)
            rerank_embedding_score = sort_and_track_indices(rerank_chunk_text_pairs_scores)[:limit]
            helpful_rerank_embedding_index = [helpful_embedding_score_by_bm_25_rank_index[i] for i in rerank_embedding_score]
            final_results = [top_n_items_by_bm25[i] for i in helpful_rerank_embedding_index]
            return final_results

        final_rerank_res = []
        if type(collection_name) == list:
            for c_name in collection_name:
                current_collection_rerank_res = _single_collection_search(queries_bce_embeddings=queries_bce_embeddings,
                                                                          collection_name=c_name)
                final_rerank_res.extend(current_collection_rerank_res)
        else:
            current_collection_rerank_res = _single_collection_search(queries_bce_embeddings=queries_bce_embeddings,
                                                                      collection_name=collection_name)
            final_rerank_res.extend(current_collection_rerank_res)

        current_re_rerank_list = []
        for i in final_rerank_res:
            current_re_rerank_list.append({
                "id": i["id"],
                "distance": 0.99,
                "entity": {
                    "chunk_text": i["chunk_text"],  # str
                    "book_name": i["book_name"],
                    "indices": i["indices"],
                    "types": i["types"],
                }
            })
        return [current_re_rerank_list]

    def search_books_same(self, queries: list, collection_names: list):
        same_books = {}
        if len(queries) == 0 or len(collection_names) == 0:
            return same_books
        books = []
        for co in collection_names:
            books.extend(self.collection_books.get(co, []))
        if len(books) == 0:
            return same_books
        exit_books_emb = self.embedding_model.encode(sentences=books, enable_tqdm=False)
        queries_embs = self.embedding_model.encode(sentences=queries, enable_tqdm=False)
        embedding_score_list = (queries_embs @ exit_books_emb.T)

        for index, (embedding_score) in enumerate(embedding_score_list):
            indexed_lst = list(enumerate(embedding_score))
            same_index, same_score = sorted(indexed_lst, key=lambda x: x[1], reverse=True)[0]
            if same_score > self.book_name_same_threshold:
                same_books[queries[index]] = books[same_index]
            else:
                same_books[queries[index]] = None
        return same_books


if __name__ == "__main__":
    """加载模型文件"""
    import glob

    os.environ["CUDA_VISIBLE_DEVICES"] = "1"

    queries = ["什么是滤波器"]
    collection_name = "test_rag_new"
    limit = 5

    books = glob.glob("/data3/kgs_book_data/test_0320/*.pkl")
    hs = HybridSearch(embedding_model_path="/data1/bce/bce-embedding-base_v1",
                      rerank_model_path="/data1/bce/bce-reranker-base_v1", books=[],
                      host='127.0.0.1',
                      port=19530)

    """搜索"""
    res = hs.run(queries=queries, collection_name=collection_name, limit=limit)

    # print(len(res[0]))
    

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容