【Python】Python 与 Neo4j 交互(py2neo 使用)

第一部分:基石 – 图论、Neo4j 与 Cypher 的再审视

第一章:超越节点与关系:深度剖析标签属性图模型(LPG)

1.1 属性作为一等公民:数据类型、存储与索引内幕

在 Neo4j 的世界中,属性(Properties)并不仅仅是节点(Node)和关系(Relationship)的简单附庸。它们是构成图数据丰富语义的核心元素,是模型表达能力和查询性能的关键所在。将属性理解为一等公民,是从入门到精通的必经之路。

属性是一个键值对(Key-Value Pair)集合,其中键(Key)是一个字符串,而值(Value)可以是多种原生数据类型。

原生数据类型及其内部机制:

数值类型(Numeric Types):

Integer: Neo4j 内部使用 64 位有符号整数。在 py2neo 中,这直接对应 Python 的 int 类型。当数据通过 Bolt 协议传输时,会根据数值大小选择最高效的编码方式(例如,小整数使用单个字节)。
Float: Neo4j 内部使用 64 位双精度浮点数(IEEE 754 标准)。这直接对应 Python 的 float 类型。需要注意的是,与所有浮点数运算一样,可能会存在精度问题,不建议将浮点数用于需要精确计算的场景,如金融交易金额。

# 导入 Graph 类,用于连接到 Neo4j 数据库
from py2neo import Graph, Node

# 建立到本地 Neo4j 数据库的连接,默认用户是 neo4j,密码是 password
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))

# 创建一个名为 "Product" 的节点
product_node = Node("Product") # 指定节点的标签为 "Product"

# 为节点设置整型属性 'stock_count'
product_node["stock_count"] = 1024 # 设置库存数量,这是一个整数

# 为节点设置浮点型属性 'unit_price'
product_node["unit_price"] = 199.98 # 设置单价,这是一个浮点数

# 在数据库中创建这个节点
graph.create(product_node) # 将该节点及其属性写入数据库

# 我们可以通过 Cypher 查询来验证
result = graph.run("""
    MATCH (p:Product)            // 匹配所有标签为 Product 的节点,并将其赋值给变量 p
    WHERE p.stock_count = 1024   // 添加条件,要求节点的 stock_count 属性值为 1024
    RETURN p.unit_price          // 返回该节点的 unit_price 属性
""").evaluate() # 使用 evaluate() 直接获取查询结果的第一个值

# 打印查询到的价格
print(f"查询到的产品单价是: {
                result}") # 格式化输出查询到的单价

字符串类型(String Type):

String: Neo4j 中的字符串使用 UTF-8 编码。这使得它可以存储世界上几乎所有的语言字符。py2neo 中的 Python str 类型会无缝地进行转换。字符串是图数据库中最常用的数据类型之一,用于存储名称、描述、ID 等信息。

# 导入 Graph 和 Node 类
from py2neo import Graph, Node

# 连接到本地 Neo4j 数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))

# 创建一个名为 'User' 的节点
user_node = Node("User") # 指定节点的标签为 "User"

# 设置字符串属性 'username'
user_node["username"] = "alice_g" # 设置用户的登录名

# 设置一个包含多语言字符的字符串属性 'bio'
user_node["bio"] = "一个喜欢探索图数据库的开发者。こんにちは世界。" # 设置用户的个人简介,包含中文和日文

# 设置一个用作外部系统ID的字符串属性 'external_id'
user_node["external_id"] = "user-73f1b4a-11ee-4a2d-8b2c-35a16d8e0d6a" # 通常使用 UUID 或其他唯一标识符

# 将节点写入数据库
graph.create(user_node) # 执行创建操作

# 使用 Cypher 查询并验证
user_bio = graph.run("""
    MATCH (u:User {username: 'alice_g'}) // 匹配 username 为 'alice_g' 的 User 节点
    RETURN u.bio                         // 返回该用户的 bio 属性
""").evaluate() # 获取查询结果

# 打印用户的简介
print(f"查询到的用户简介: {
                user_bio}") # 输出简介内容

布尔类型(Boolean Type):

Boolean: 存储 truefalse。对应 Python 的 TrueFalse。常用于表示状态,例如 is_active, is_verified

# 导入 Graph 和 Node 类
from py2neo import Graph, Node

# 连接到本地 Neo4j 数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))

# 创建一个名为 'Article' 的节点
article_node = Node("Article") # 指定节点的标签为 "Article"

# 设置布尔型属性 'is_published' 为 True
article_node["is_published"] = True # 表示文章已发布

# 设置布尔型属性 'is_featured' 为 False
article_node["is_featured"] = False # 表示文章不是精选文章

# 设置一个标题
article_node["title"] = "深入理解图数据库" # 文章标题

# 将节点写入数据库
graph.create(article_node) # 执行创建操作

# 查询所有已发布的文章数量
published_count = graph.run("""
    MATCH (a:Article)           // 匹配所有 Article 节点
    WHERE a.is_published = true // 筛选出 is_published 为 true 的节点
    RETURN count(a)             // 返回匹配到的节点数量
""").evaluate() # 获取计数值

# 打印已发布文章的数量
print(f"已发布的文章数量: {
                published_count}") # 输出数量

列表类型(List/Array Type):

List: 属性值可以是一个列表(或数组)。重要的是,列表中的所有元素必须是相同的数据类型(homogenous)。它们不能是混合类型,例如 [1, "hello", True] 是不允许的。列表可以是任何其他原生类型的列表,例如 List<Integer>, List<String>, List<Point>。这对于存储标签、别名、历史值等非常有用。

# 导入 Graph 和 Node 类
from py2neo import Graph, Node

# 连接到本地 Neo4j 数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))

# 创建一个名为 'Post' 的节点
post_node = Node("Post") # 节点的标签为 "Post"

# 设置一个字符串列表属性 'tags'
post_node["tags"] = ["neo4j", "python", "py2neo", "graphdb"] # 文章的标签列表

# 设置一个整数列表属性 'editor_ids'
post_node["editor_ids"] = [101, 105, 210] # 编辑过该文章的编辑ID列表

# 设置一个浮点数列表属性 'version_history_scores'
post_node["version_history_scores"] = [0.8, 0.95, 0.98] # 历史版本的评分

# 将节点写入数据库
graph.create(post_node) # 执行创建

# 查询包含 'python' 标签的帖子
# Cypher 中的 IN 操作符可以高效地检查列表中的成员
posts_with_python_tag = graph.run("""
    MATCH (p:Post)                // 匹配所有 Post 节点
    WHERE 'python' IN p.tags      // 筛选条件:'python' 字符串在节点的 'tags' 列表属性中
    RETURN p.editor_ids           // 返回该帖子的编辑ID列表
""").data() # 使用 .data() 获取所有查询结果记录的列表,每条记录是一个字典

# 打印查询结果
for record in posts_with_python_tag: # 遍历查询结果
    print(f"找到一个相关帖子,其编辑ID列表为: {
                record['p.editor_ids']}") # 打印编辑ID列表

属性的索引机制:

为属性创建索引是提升查询性能最直接、最有效的方式。当一个查询的 WHERE 子句、MATCH 的节点标识或 MERGEON 子句中使用了某个属性进行查找时,索引能够避免全库扫描(All-Nodes Scan 或 All-Relationships Scan),将查找复杂度从 O(N) 降低到 O(log N) 或 O(1) 的级别。

Neo4j 提供多种索引类型:

单属性索引(Single-Property Index): 这是最常见的索引类型。

B-Tree 索引: 这是默认的索引结构,适用于精确匹配 (=)、范围查询 (>, <, >=, <=) 以及前缀搜索 (STARTS WITH)。

# 导入 Graph 类
from py2neo import Graph

# 连接到数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))

# 获取 schema 对象,用于管理索引和约束
schema = graph.schema

# 为标签 "User" 的 "username" 属性创建 B-Tree 索引
# 这是一个幂等操作,如果索引已存在,则不会执行任何操作
schema.create_index("User", "username") # 第一个参数是标签名,第二个参数是属性名

print("为 (User.username) 创建或确认了 B-Tree 索引。") # 打印确认信息

# 同样可以为关系属性创建索引
# schema.create_index("KNOWS", "since") # 假设 KNOWS 关系上有 since 属性

# 使用 EXPLAIN 来分析查询计划,确认索引是否被使用
# 在一个拥有大量 User 节点但没有索引的数据库中,这个查询会是 AllNodesScan
# 在创建索引后,这个查询会是 NodeIndexSeek,性能天差地别
query_plan = graph.run("""
    EXPLAIN MATCH (u:User)          // EXPLAIN 关键字用于展示查询计划,而不是执行查询
    WHERE u.username = 'alice_g'  // 基于被索引的属性进行查找
    RETURN u
""").data() # 获取查询计划的结果

# 打印查询计划以供分析
# 在输出中,你会寻找 "NodeIndexSeek" 或类似的字眼来确认索引生效
import json # 导入json模块以便更好地格式化打印
print(json.dumps(query_plan, indent=2)) # 使用json.dumps美化输出

# 删除索引
# schema.drop_index("User", "username") # 如果需要,可以删除索引
# print("删除了 (User.username) 的 B-Tree 索引。")

复合索引(Composite Index): 对多个属性同时建立索引。这对于需要同时根据多个属性进行精确匹配的查询非常有效。复合索引的顺序很重要。

# 导入 Graph 类
from py2neo import Graph

# 连接到数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))

# 获取 schema 对象
schema = graph.schema

# 为 "Product" 标签的 "category" 和 "brand" 两个属性创建复合索引
# 当查询需要同时过滤 category 和 brand 时,这个索引会非常高效
schema.create_index("Product", "category", "brand") # 传入多个属性名

print("为 (Product.category, Product.brand) 创建或确认了复合索引。") # 打印确认信息

# 分析一个使用复合索引的查询
query_plan_composite = graph.run("""
    EXPLAIN MATCH (p:Product)                         // 分析一个匹配 Product 节点的查询
    WHERE p.category = 'electronics' AND p.brand = 'Neo' // 同时使用复合索引中的两个字段
    RETURN p.name
""").data() # 获取查询计划

# 打印查询计划,会看到 CompositeNodeIndexSeek 相关的操作
import json # 导入json模块
print(json.dumps(query_plan_composite, indent=2)) # 美化输出

全文索引(Full-Text Index): 专为自然语言文本搜索设计。它支持更复杂的查询,如模糊匹配、相关性评分等。全文索引使用 Apache Lucene 作为底层引擎。

# 导入 Graph 类
from py2neo import Graph

# 连接到数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))

# 创建全文索引需要使用 Cypher,因为 py2neo 的 schema API 暂未直接封装它
# 首先,定义一个全文索引,名为 'article_titles_and_content'
# 这个索引会覆盖 Node 的 "Article" 和 "Paper" 标签
# 并且会索引它们的 "title" 和 "content" 属性
graph.run("""
    CREATE FULLTEXT INDEX article_titles_and_content FOR (n:Article|Paper) ON EACH [n.title, n.content]
    // CREATE FULLTEXT INDEX ... FOR ... ON EACH ... 是创建全文索引的Cypher语法
    // article_titles_and_content 是索引的名称
    // FOR (n:Article|Paper) 指定该索引应用于 Article 或 Paper 标签的节点
    // ON EACH [n.title, n.content] 指定要索引的属性列表
""")

print("创建了名为 'article_titles_and_content' 的全文索引。") # 打印确认信息

# 插入一些数据用于测试
graph.run("""
    CREATE (:Article {title: 'Learning py2neo basics', content: 'py2neo is a powerful library for python developers.'})
    // 创建一个Article节点及其属性
""")
graph.run("""
    CREATE (:Article {title: 'Advanced graph modeling', content: 'Modeling complex domains requires deep graph knowledge.'})
    // 创建另一个Article节点及其属性
""")

# 使用全文索引进行查询
# 使用 db.index.fulltext.queryNodes(indexName, queryString) 函数
search_results = graph.run("""
    CALL db.index.fulltext.queryNodes('article_titles_and_content', 'python powerful')
    // CALL db.index.fulltext.queryNodes 是调用全文索引进行查询的存储过程
    // 'article_titles_and_content' 是我们之前创建的索引名
    // 'python powerful' 是搜索的关键词,Lucene 会进行分词和匹配
    YIELD node, score // 查询过程会返回匹配的节点(node)和相关性得分(score)
    RETURN node.title AS title, score // 返回节点的标题和得分
""").data() # 获取查询结果

# 打印搜索结果
print("全文搜索结果:") # 打印标题
for record in search_results: # 遍历结果
    print(f"  标题: {
                record['title']}, 相关性得分: {
                record['score']}") # 打印每条记录的标题和得分

# 删除全文索引(如果需要)
# graph.run("DROP INDEX article_titles_and_content")
# print("删除了全文索引 'article_titles_and_content'。")

属性存储的内部视角:

Neo4j 将数据存储在一系列的存储文件(store files)中。

neostore.nodestore.db: 存储节点记录。每个节点记录都是固定大小的,包含指向其第一个关系、第一个属性以及标签存储的指针。
neostore.propertystore.db: 存储所有属性记录。这是一个巨大的文件,包含了数据库中所有的属性键值对。当一个节点或关系拥有属性时,其在 nodestorerelationshipstore 中的记录会包含一个指向 propertystore 中第一个属性记录的指针。如果该实体有多个属性,它们会形成一个链表结构。
neostore.propertystore.db.index.*: 这是 B-Tree 索引文件,用于加速属性查找。
neostore.propertystore.db.stringsneostore.propertystore.db.arrays: 用于存储动态大小的属性值,如字符串和列表。固定大小的属性(如 Integer, Float, Boolean)可以直接存储在属性记录中,而动态大小的则存储在这些专门的文件里,属性记录中只保存一个指向它们的指针。

这种存储结构的设计哲学是:

快速遍历: 节点和关系的记录是固定大小的,使得从一个节点到其邻居节点的指针跳转(图遍历的核心操作)非常快速,因为可以精确计算出下一条记录在文件中的偏移量。这被称为“无索引邻接”(Index-Free Adjacency)。
属性灵活性: 将属性分离存储,允许节点和关系拥有不同数量、不同类型的属性,提供了极高的模式灵活性(Schema-Free)。
性能权衡: 读取一个节点的属性需要额外的磁盘 I/O(指针跳转到 propertystore)。这就是为什么“密集的”属性(即一个节点/关系上属性非常多)可能会影响性能,以及为什么在设计模型时需要权衡哪些信息应该作为属性,哪些应该建模为独立的节点。

1.2 标签 vs. 类型 vs. 类别:语义建模、多标签与查询性能

在 Neo4j 中,节点通过**标签(Labels)进行分类,关系通过类型(Types)**进行分类。这是一个看似简单但极其重要的概念,深刻影响着数据模型的语义清晰度和查询性能。

标签(Labels)的深度理解:

一个节点可以有零个、一个或多个标签。这个特性是 Neo4j 模型表达能力强大的关键之一。

标签是节点的“类型”或“角色”: 将标签看作是节点在特定上下文中所扮演的角色。例如,一个节点可以同时拥有 UserModerator 两个标签,这清晰地表明该用户既是普通用户,也拥有版主权限。

多标签的威力:

继承/层次结构模拟: 虽然 Neo4j 没有内置的继承模型,但多标签可以优雅地模拟它。例如,可以有一个 Content 标签,然后有更具体的 Article, Video, Podcast 标签。一篇新文章的节点可以被打上 (:Article:Content) 两个标签。这样,你可以通过 :Content 查询所有类型的内容,也可以通过 :Article 精确查询文章。
状态机建模: 节点的生命周期状态可以用标签来管理。一个 Order 节点可以开始是 (:Order:Pending),然后变成 (:Order:Processing),再到 (:Order:Shipped),最后是 (:Order:Completed)。通过原子地添加和删除标签(SET n:NewLabel REMOVE n:OldLabel),可以清晰地追踪其状态,并且可以为特定状态的节点创建索引以加速查询。
切面/特征组合: 就像面向切面编程(AOP),多标签可以为节点附加各种正交的特征。例如,一个 Product 节点可以有 (:Product:OnSale:Featured:NewArrival) 等标签,这些标签可以动态地添加和移除,用于驱动不同的业务逻辑和UI展示。

# 导入 Graph, Node, Relationship 类
from py2neo import Graph, Node, Relationship

# 连接数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))

# 清理旧数据以便演示
graph.run("MATCH (n) DETACH DELETE n")

# --- 模拟继承 ---
# 创建一个基础"内容"节点,它也是一篇"文章"
article_node = Node("Content", "Article", title="图数据库入门", word_count=5000) # 同时给予两个标签
graph.create(article_node)

# 创建一个基础"内容"节点,它也是一个"视频"
video_node = Node("Content", "Video", title="Neo4j 安装教程", duration_seconds=300) # 同时给予两个标签
graph.create(video_node)

# 查询所有"内容"
all_content = graph.run("MATCH (c:Content) RETURN c.title, labels(c) AS labels").data()
# labels(c) 是一个 Cypher 函数,返回节点 c 的所有标签列表
print("
--- 所有内容 ---")
for item in all_content:
    print(f"标题: {
              item['c.title']}, 标签: {
              item['labels']}")

# 只查询所有"文章"
all_articles = graph.run("MATCH (a:Article) RETURN a.title").data()
print("
--- 所有文章 ---")
for item in all_articles:
    print(f"标题: {
              item['a.title']}")

# --- 状态机建模 ---
# 创建一个初始状态为 "Pending" 的订单
order_id = "ORD-2023-001"
order_node = Node("Order", "Pending", id=order_id, amount=299)
graph.create(order_node)
print(f"
创建了订单 {
              order_id},初始状态为 Pending。")

# 将订单状态更新为 "Shipped"
# 这是一个原子操作
update_query = """
MATCH (o:Order {id: $order_id}) // 匹配订单
REMOVE o:Pending                 // 移除 'Pending' 标签
SET o:Shipped, o.shipped_at = datetime() // 添加 'Shipped' 标签,并记录发货时间
RETURN labels(o) AS new_labels   // 返回更新后的标签
"""
result = graph.run(update_query, order_id=order_id).evaluate()
print(f"订单 {
              order_id} 状态已更新为 {
              result}。")

# --- 切面/特征组合 ---
# 创建一个产品
product_node = Node("Product", name="智能手表 V3", price=1299)
# 初始时它只是一个产品

# 把它标记为"新品"和"特价"
graph.run("""
    MATCH (p:Product {name: '智能手表 V3'}) // 匹配该产品
    SET p:NewArrival, p:OnSale              // 使用 SET 同时添加多个标签
""")
print("
产品 '智能手表 V3' 已被标记为新品和特价。")

# 查询所有特价商品
on_sale_products = graph.run("MATCH (p:OnSale) RETURN p.name, p.price").data()
print("
--- 当前特价商品 ---")
for p in on_sale_products:
    print(f"  产品: {
              p['p.name']}, 价格: {
              p['p.price']}")

# 一段时间后,它不再是"新品"
graph.run("""
    MATCH (p:Product {name: '智能手表 V3'}) // 匹配该产品
    REMOVE p:NewArrival                     // 移除 'NewArrival' 标签
""")
print("
产品 '智能手表 V3' 不再是新品。")

标签与查询性能的关系:

标签是 Neo4j 查询优化的第一道关卡。当你执行一个查询,如 MATCH (u:User) WHERE u.name = 'Alice',查询引擎的执行流程是:

标签查找: Neo4j 内部有一个从标签到节点 ID 的映射(可以看作是一个索引)。它会首先、并且极快地定位到所有带有 User 标签的节点集合。
属性过滤: 然后,在这个较小的节点集合中,它会再根据 WHERE 子句中的属性(u.name = 'Alice')进行过滤。如果 name 属性上有索引,这个过程会使用 B-Tree 索引(NodeIndexSeek);如果没有,它会扫描所有 User 节点(NodeByLabelScan)。

关键在于,查询永远不会从全库所有节点开始扫描(除非你写了 MATCH (n) 这样没有标签的查询)。指定标签,哪怕只有一个,也能将搜索范围急剧缩小。因此,MATCH 子句中总是指定至少一个标签是 Neo4j 查询的最佳实践

关系类型(Relationship Types)的深度理解:

与节点标签不同,一个关系只能有一个类型。关系类型定义了两个节点之间连接的确切语义

类型是关系的“动词”: 如果说节点是名词,那么关系类型就是动词或动词短语,它描述了从起始节点到结束节点发生了什么行为或存在什么联系。例如,(user)-[:PURCHASED]->(product)PURCHASED 就是这个关系的确切类型。
方向性至关重要: Neo4j 中所有关系都是有方向的。(a)-[:KNOWS]->(b)(b)-[:KNOWS]->(a) 可能表示相同的事实(如果“认识”是相互的),但它们是两个不同的关系实例。查询时可以忽略方向 MATCH (a)-[:KNOWS]-(b),但这只是语法糖,底层存储总是有方向的。方向性对于表达非对称关系(如 FOLLOWS, REPORTS_TO, CONTAINS)至关重要。

# 导入所需类
from py2neo import Graph, Node, Relationship

# 连接数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))

# 创建两个节点
alice = Node("User", name="Alice")
movie = Node("Movie", title="Inception")

# 创建一个带有明确类型的关系
# 关系本身也可以有属性
rel = Relationship(alice, "WATCHED", movie, rating=5, watched_at=datetime())
# alice 是起始节点
# "WATCHED" 是关系类型
# movie 是结束节点
# rating=5 和 watched_at=datetime()是该关系的属性

# 将节点和关系一次性创建到数据库中
graph.create(rel)
print("创建了 (Alice)-[:WATCHED]->(Inception) 的关系")

# 查询时,关系类型是核心的过滤条件
# 查找 Alice 看过的所有电影
watched_movies = graph.run("""
    MATCH (u:User {name: 'Alice'})-[r:WATCHED]->(m:Movie)
    // (u:User {name: 'Alice'}) 匹配 Alice 节点
    // -[r:WATCHED]-> 匹配一个从 u 出发,类型为 WATCHED 的关系,并将其赋给变量 r
    // (m:Movie) 匹配该关系指向的 Movie 节点
    RETURN m.title, r.rating // 返回电影标题和关系上的评分属性
""").data()

print("
Alice 看过的电影:")
for record in watched_movies:
    print(f"  电影: {
              record['m.title']}, 评分: {
              record['r.rating']}")

# 在查询中,你可以用 | 来指定多种关系类型
bob = Node("User", name="Bob")
review = Node("Review", text="Mind-blowing movie!")
graph.create(bob)
graph.create(review)
graph.create(Relationship(bob, "WROTE", review)) # Bob 写了评论
graph.create(Relationship(review, "IS_ABOUT", movie)) # 评论是关于这部电影的

# 查找所有与电影 "Inception" 直接或间接(通过评论)相关的用户
# 这是一个更复杂的模式匹配
related_users = graph.run("""
    MATCH (m:Movie {title: 'Inception'})
    // 首先匹配电影 "Inception"
    MATCH (u:User)-[:WATCHED|WROTE*1..2]->(m)
    // 匹配用户 u,该用户通过 WATCHED 或 WROTE 关系,经过 1 到 2 跳可以到达电影 m
    // *1..2 是可变长度路径,表示关系链的长度可以是1或2
    // | 表示或,即关系类型可以是 WATCHED 或 WROTE
    RETURN DISTINCT u.name // 返回去重后的用户名
""").data()

print("
与电影 'Inception' 相关的用户:")
for record in related_users:
    print(f"  用户: {
              record['u.name']}")

语义建模中的权衡:

在设计图模型时,经常会遇到一个问题:某个信息应该作为属性标签,还是一个独立的节点

作为属性: 当信息是描述性数据,是节点/关系的内在特征,并且不具有复杂的内部结构或关系时。例如,用户的 age、文章的 word_count

优点: 查询简单直接 (node.property),存储紧凑。
缺点: 无法为属性本身添加属性或关系。你不能把 (user)-[:LIVES_IN]->(user.city) 这样的关系连接到 city 这个属性上。

作为标签: 当信息代表节点的“类别”、“状态”或“角色”,并且你希望根据这个分类进行高效的节点分组和查询时。例如,User, Admin, Article, Video, Order:Shipped

优点: 极高的查询性能,清晰地表达节点的分类。
缺点: 标签本身是元数据,不能有属性。你不能给 Shipped 这个标签添加一个 shipping_company 属性。

作为节点: 当信息是一个“实体”,它自身可以有属性,并且可以和其他实体建立复杂关系时。例如,City, Company, Tag

优点: 最大的灵活性。City 节点可以有 population 属性,可以连接到 Country 节点 (city)-[:LOCATED_IN]->(country)
缺点: 增加了模型的复杂性和图中的节点/关系数量,可能会使简单查询变得更长。

决策案例分析:

假设我们要对用户的“技能(Skills)”进行建模。

方案一:属性(列表)

模型: (u:User {skills: ['Python', 'Neo4j', 'Docker']})
优点: 非常简单,添加/读取技能很快。
缺点:

无法表达技能的熟练度(Proficiency)。
无法将 Python 这个技能本身作为一个实体进行查询(例如,查找所有拥有 Python 技能的用户,需要扫描所有用户节点的 skills 列表,效率较低)。
无法将技能与其他实体关联,比如哪个项目使用了这个技能。

方案二:独立节点

模型: (u:User)-[:HAS_SKILL {proficiency: 'Expert'}]->(s:Skill {name: 'Python'})
优点:

语义最丰富: 关系上可以存储熟练度。Skill 节点本身可以有 description 属性。
查询能力最强: 可以轻松地从 Skill 节点出发,找到所有具备该技能的用户 MATCH (s:Skill {name: 'Python'})<-[:HAS_SKILL]-(u:User) RETURN u。这个查询可以利用 Skill.name 上的索引,极其高效。
扩展性最好: 以后可以添加 (s:Skill)-[:USED_IN]->(p:Project) 这样的关系。

缺点: 模型更复杂,写入数据时需要创建节点和关系,而不是仅仅更新一个属性。

第一章:超越节点与关系:深度剖析标签属性图模型(LPG)(续)

1.3 虚拟关系与图算法的基石:当关系不再是关系

在标准的 Neo4j 数据模型中,关系是显式存储的,是数据库中的一等公民。然而,在进行高级图分析和算法应用时,我们经常会遇到“虚拟关系”(Virtual Relationships)的概念。虚拟关系并不作为持久化数据存储在数据库中,而是在查询执行期间,由图算法或特定查询动态计算和生成。理解虚拟关系是从数据建模者转向图分析师的关键一步。

虚拟关系的核心思想:

虚拟关系的核心思想是基于节点属性或拓扑结构即时推断出新的连接。这使得我们可以在不改变底层物理存储的情况下,探索数据中隐藏的、高阶的、或特定于分析场景的联系。

应用场景一:基于属性相似度的虚拟关系

假设我们有一个用户图,用户节点拥有多个属性,比如兴趣标签、所在城市、年龄等。我们可能想分析“相似用户”的社群,但“相似”这个关系在数据库中并不存在。我们可以动态地定义它。

Jaccard 相似度: 衡量两个集合相似度的经典指标。公式为:J(A, B) = |A ∩ B| / |A ∪ B|。我们可以用它来计算两个用户兴趣标签的相似度。

py2neo 结合 Neo4j 的图数据科学库(Graph Data Science Library, GDS)可以非常优雅地处理这类问题。GDS 允许我们在内存中创建一个图的投影(in-memory graph projection),然后在这个投影上运行算法,包括生成虚拟关系。

# 导入所需库
from py2neo import Graph

# 连接到数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))

# 准备演示数据:创建一些用户和他们的兴趣
graph.run("MATCH (n) DETACH DELETE n") # 清理环境
graph.run("""
    CREATE (u1:User {id: 1, name: 'Alice', interests: ['graph', 'python', 'music']})
    // 创建用户Alice,并设置她的兴趣列表
    CREATE (u2:User {id: 2, name: 'Bob', interests: ['python', 'ml', 'data']})
    // 创建用户Bob,并设置他的兴趣列表
    CREATE (u3:User {id: 3, name: 'Charlie', interests: ['graph', 'python', 'neo4j']})
    // 创建用户Charlie,并设置他的兴趣列表
    CREATE (u4:User {id: 4, name: 'David', interests: ['sports', 'cooking']})
    // 创建用户David,他的兴趣与其他人完全不同
""")
print("用户和兴趣数据创建完毕。")

# GDS 工作流程第一步:创建图投影
# 我们将 User 节点和它们之间的关系(这里没有显式关系,所以只投射节点)加载到内存中
# 我们将 'interests' 属性也加载进去,以便算法使用
# 'user_interests_graph' 是我们给这个内存图起的名字
create_projection_query = """
    CALL gds.graph.project(
        'user_interests_graph',      // 内存图的名称
        'User',                      // 要投射的节点标签
        '*',                         // 要投射的关系类型('*' 表示所有)
        {
            nodeProperties: 'interests' // 要加载到内存中的节点属性
        }
    )
    YIELD graphName, nodeCount, relationshipCount
    // gds.graph.project 是GDS库中用于创建图投影的程序
    // YIELD 子句用于获取程序执行的返回结果
    RETURN graphName, nodeCount, relationshipCount
    // 返回图名称、节点数和关系数
"""
projection_result = graph.run(create_projection_query).data()
print(f"GDS 图投影创建成功: {
              projection_result}")

# GDS 工作流程第二步:在图投影上运行节点相似度算法
# 我们使用 gds.nodeSimilarity.write 算法
# 它会计算节点间的 Jaccard 相似度,并将相似度大于阈值的节点对之间用虚拟关系连接起来
# 这个算法会把结果写回(materialize)到 Neo4j 数据库中,形成新的、显式的 :SIMILAR 关系
similarity_query = """
    CALL gds.nodeSimilarity.write(
        'user_interests_graph',      // 在哪个图投影上运行
        {
            writeRelationshipType: 'SIMILAR', // 生成的关系的类型
            writeProperty: 'jaccardScore',    // 存储相似度分数的属性名
            similarityCutoff: 0.1             // 相似度阈值,低于此值不创建关系
        }
    )
    YIELD nodesCompared, relationshipsWritten
    // gds.nodeSimilarity.write 是计算相似度并写回关系的程序
    // writeRelationshipType 定义了新创建关系的类型
    // writeProperty 定义了存储分数的属性
    // similarityCutoff 是一个过滤器,只有相似度高于这个值的才会创建关系
    RETURN nodesCompared, relationshipsWritten
    // 返回比较的节点对数量和实际创建的关系数量
"""
similarity_result = graph.run(similarity_query).data()
print(f"节点相似度计算并写回完成: {
              similarity_result}")

# GDS 工作流程第三步:查询新生成的关系
# 现在数据库中物理上存在了 :SIMILAR 关系
similar_users = graph.run("""
    MATCH (u1:User)-[r:SIMILAR]->(u2:User) // 匹配新创建的 SIMILAR 关系
    WHERE u1.id < u2.id // 避免重复 (A,B) 和 (B,A)
    RETURN u1.name AS user1, u2.name AS user2, r.jaccardScore AS score // 返回用户名和相似度分数
    ORDER BY score DESC // 按分数降序排列
""").data()

print("
基于兴趣计算出的相似用户关系:")
for rel in similar_users:
    print(f"  用户 '{
              rel['user1']}' 和 '{
              rel['user2']}' 相似,Jaccard 分数: {
              rel['score']:.4f}")
    // 打印相似的用户对和他们的Jaccard分数,格式化为4位小数

# GDS 工作流程第四步:清理内存图投影
graph.run("CALL gds.graph.drop('user_interests_graph')")
print("
内存中的图投影已被删除。")

在这个例子中,:SIMILAR 关系就是一种被“实体化”(Materialized)的虚拟关系。它在算法运行之前不存在,是基于 interests 属性动态计算出来的。将其写回数据库,可以极大地方便后续的社群发现、推荐系统等查询。

应用场景二:基于路径的虚拟关系

图的真正威力在于路径。有时,两个节点之间是否存在某种特定模式的路径,比它们之间是否有直接连接更重要。

例如,在一个金融交易网络中,我们可能想定义一个 SUSPICIOUSLY_LINKED 的虚拟关系。这个关系成立的条件是:如果账户 A 通过不超过 4 次转账(TRANSFER 关系),将资金转移到了账户 B,并且路径上经过了一个被标记为 Risky 的账户。

这种关系完全是基于拓扑结构的,我们可以在查询时动态发现它。

# 导入所需库
from py2neo import Graph, Node, Relationship

# 连接数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))

# 准备数据:构建一个简单的交易网络
graph.run("MATCH (n) DETACH DELETE n") # 清理环境
graph.run("""
    // 创建账户节点
    CREATE (a1:Account {id: 'A001'})
    CREATE (a2:Account {id: 'A002'})
    CREATE (a3:Account:Risky {id: 'A003'}) // a3 是一个高风险账户
    CREATE (a4:Account {id: 'A004'})
    CREATE (a5:Account {id: 'A005'})
    CREATE (a6:Account {id: 'A006'}) // 无关账户
    
    // 创建交易关系
    CREATE (a1)-[:TRANSFER {amount: 100}]->(a2)
    CREATE (a2)-[:TRANSFER {amount: 200}]->(a3) // 交易经过了高风险账户 a3
    CREATE (a3)-[:TRANSFER {amount: 150}]->(a4)
    CREATE (a4)-[:TRANSFER {amount: 50}]->(a5)
    
    CREATE (a1)-[:TRANSFER {amount: 999}]->(a6) // 另一条不经过风险账户的路径
""")
print("金融交易网络数据已创建。")

# 查询“可疑连接”的虚拟关系
# 我们不创建新关系,而是在查询中直接找到满足条件的节点对
suspicious_link_query = """
    MATCH path = (start_node:Account)-[:TRANSFER*1..4]->(end_node:Account)
    // 匹配一个从 start_node 到 end_node,长度为 1 到 4 跳的 TRANSFER 路径
    // path 是这个路径的变量名
    
    WHERE start_node <> end_node // 确保起点和终点不是同一个账户
      AND ANY(node IN nodes(path) WHERE 'Risky' IN labels(node))
      // 关键条件:路径上的任意一个节点(node IN nodes(path)),其标签中包含 'Risky'
      // nodes(path) 是一个 Cypher 函数,返回路径上的所有节点列表
      // ANY(...) 是一个断言函数,当列表中的任何元素满足内部的 WHERE 条件时返回 true
      
    RETURN DISTINCT start_node.id AS source, end_node.id AS destination
    // 返回所有满足条件的起点和终点对,并去重
"""

suspicious_links = graph.run(suspicious_link_query).data()

print("
发现的“可疑连接”(虚拟关系):")
for link in suspicious_links:
    print(f"  账户 '{
              link['source']}' 与账户 '{
              link['destination']}' 存在可疑资金链路。")
    // 打印查询到的可疑连接

在这个例子中,(start_node)(end_node) 之间的 SUSPICIOUSLY_LINKED 关系完全是虚拟的。它由 MATCH 子句中的路径模式和 WHERE 子句中的条件共同定义。我们没有在数据库里存储任何东西,但却能查询到这种复杂的、高阶的联系。

虚拟关系与图算法

几乎所有的图算法,其核心都是在处理虚拟关系。

最短路径(Shortest Path): 算法动态计算出节点间最短的路径,这条路径就是一种虚拟关系,代表了“最有效率的连接”。
社区发现(Community Detection,如 Louvain, Label Propagation): 算法根据图的拓扑结构(边的密度)将节点分组。属于同一个社区,就是一种虚拟关系,代表“紧密连接的群体成员”。
中心性算法(Centrality,如 PageRank, Betweenness): 算法为每个节点计算一个分数,这个分数反映了节点在网络中的重要性。我们可以基于这个分数创建虚拟关系,例如,连接所有 PageRank 分数高于某个阈值的节点,形成“核心影响力网络”。

理解了虚拟关系,就等于掌握了将静态的图数据转化为动态的、富有洞察力的分析结果的钥匙。它将图数据库的应用从简单的数据存储和检索,提升到了复杂的网络分析和智能发现的层面。


第二部分:Py2neo 核心架构与高级编程

第二章:不仅仅是驱动:Py2neo 的架构哲学与对象模型

py2neo 并不仅仅是一个简单的 Bolt 协议驱动程序。它的设计哲学是提供一个高度 Pythonic 的接口,将 Neo4j 的图概念(节点、关系、路径)映射为 Python 对象,从而让开发者能够以面向对象的方式与图数据库进行交互。深入理解其架构和对象模型,是编写高效、可维护、优雅的 py2neo 代码的前提。

2.1 Graph 对象:连接、事务与图元数据管理的入口

Graph 对象是 py2neo 世界的中心。它代表了与一个 Neo4j 数据库实例的活动连接,是所有数据库操作的起点。

创建连接:超越简单的 auth

我们已经见过 Graph("bolt://localhost:7687", auth=("neo4j", "password")) 这种基本形式。但 Graph 的构造函数提供了更多精细的控制。

# 导入 Graph 类和一些配置选项
from py2neo import Graph
from py2neo.config import LIVENESS_CHECK_TIMEOUT, MAX_CONNECTION_LIFETIME

# 一个更完整的 Graph 对象初始化示例
graph_service = Graph(
    "neo4j+s://your-aura-instance.databases.neo4j.io", // 使用 neo4j+s 协议,表示通过 Bolt+Routing 和 SSL/TLS 加密连接
    auth=("neo4j", "your_aura_db_password"), // 你的 AuraDB 或其他数据库的凭证
    name="my_application_graph", // 为这个 Graph 对象起一个逻辑名称,有助于调试和区分
    user_agent="MyApp/1.0.0", // 设置用户代理字符串,会在服务器日志中显示,便于追踪来源
    max_connection_pool_size=50, // 设置连接池的最大连接数,默认是 100
    liveness_check_timeout=30, // 设置连接活性检查的超时时间(秒),默认是 60。一个连接在被从池中取出使用前,如果空闲超过这个时间,会先发送一个心跳包检查是否存活
    max_connection_lifetime=3600 // 一个连接在池中的最大存活时间(秒),默认是 3600。到期后会被关闭并移除,防止因网络设备等问题导致的“僵尸连接”
)

# 验证连接是否成功
# graph_service.database.name 会连接到数据库并获取其名称
try:
    db_name = graph_service.database.name
    print(f"成功连接到数据库 '{
              db_name}'。")
    print(f"此连接的用户代理是: {
              graph_service.user_agent}")
    // 打印用户代理,确认配置生效
except Exception as e:
    print(f"连接失败: {
              e}")
    // 打印异常信息

协议(Scheme):

bolt: 标准的 Bolt 协议,明文传输。
bolt+s: Bolt over TLS/SSL,加密传输。
bolt+ssc: Bolt over TLS/SSL,但禁用证书验证(不推荐在生产环境使用)。
neo4j: 使用 Bolt+Routing 协议,这是连接到 Neo4j 因果集群(Causal Cluster)的标准方式。驱动会自动发现集群中的主节点(Leader)和从节点(Follower),并将写操作路由到主节点,读操作路由到从节点,实现读写分离和高可用。
neo4j+s: Encrypted Bolt+Routing,生产环境连接集群的首选。
neo4j+ssc: Encrypted Bolt+Routing,禁用证书验证。

连接池(Connection Pool): py2neo 内部维护一个连接池以提高性能。每次执行查询时,它会从池中获取一个连接,用完后归还,而不是每次都创建新的 TCP 连接。max_connection_pool_sizeliveness_check_timeout 等参数就是用来微调这个连接池的行为。对于高并发应用,合理配置连接池至关重要。

事务管理:原子性的保证

Graph 对象是事务的起点。虽然 graph.run()graph.create()graph.merge() 这些便捷方法会自动处理事务(隐式事务,Auto-Commit Transaction),但在复杂的业务逻辑中,我们需要显式地控制事务边界。

显式事务的威力:

假设我们要执行一个银行转账操作:从账户 A 扣款,给账户 B 存款。这两个操作必须是一个原子单元,要么都成功,要么都失败。

# 导入所需库
from py2neo import Graph, TransactionError

# 连接数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))

# 准备数据:创建两个账户并设置初始余额
graph.run("MATCH (n) DETACH DELETE n") # 清理环境
graph.run("""
    CREATE (:Account {id: 'ACC1001', balance: 1000.0})
    // 创建账户ACC1001,余额1000.0
    CREATE (:Account {id: 'ACC2002', balance: 500.0})
    // 创建账户ACC2002,余额500.0
""")
print("账户初始状态设置完毕。")

def transfer_money(tx, from_acc_id, to_acc_id, amount):
    """
    在给定的事务(tx)中执行转账的核心逻辑。
    """
    # 第一步:查询付款方账户的余额
    from_balance_query = "MATCH (a:Account {id: $acc_id}) RETURN a.balance AS balance"
    // 定义查询余额的Cypher语句
    from_balance = tx.run(from_balance_query, acc_id=from_acc_id).evaluate()
    // 在事务中执行查询,并获取结果

    # 检查余额是否充足
    if from_balance is None or from_balance < amount:
        // 如果账户不存在或余额不足
        raise ValueError(f"账户 {
              from_acc_id} 余额不足或不存在。")
        // 抛出异常,这将导致事务回滚

    # 第二步:执行扣款操作
    debit_query = """
        MATCH (a:Account {id: $acc_id})
        SET a.balance = a.balance - $amount
        // 匹配付款方账户,并更新其余额
    """
    tx.run(debit_query, acc_id=from_acc_id, amount=amount)
    // 在事务中执行扣款

    # 第三步:执行存款操作
    credit_query = """
        MATCH (a:Account {id: $acc_id})
        SET a.balance = a.balance + $amount
        // 匹配收款方账户,并更新其余额
    """
    tx.run(credit_query, acc_id=to_acc_id, amount=amount)
    // 在事务中执行存款

    # 第四步(可选):记录交易关系
    log_transfer_query = """
        MATCH (from:Account {id: $from_id})
        MATCH (to:Account {id: $to_id})
        CREATE (from)-[:TRANSFERRED_TO {amount: $amount, timestamp: datetime()}]->(to)
        // 创建一条关系来记录这笔交易
    """
    tx.run(log_transfer_query, from_id=from_acc_id, to_id=to_acc_id, amount=amount)
    // 在事务中执行关系创建

    print(f"在事务中成功准备了从 {
              from_acc_id} 到 {
              to_acc_id} 的转账 {
              amount} 元的操作。")
    // 打印日志

# 使用 Python 的 'with' 语句来管理事务,这是最佳实践
try:
    with graph.begin() as tx: // graph.begin() 开始一个新事务,并返回一个 Transaction 对象
        # 'with' 语句会确保:如果代码块成功执行完毕,事务会自动调用 tx.commit()
        # 如果代码块中发生任何异常,事务会自动调用 tx.rollback()
        transfer_money(tx, 'ACC1001', 'ACC2002', 150.0)
        // 调用转账函数,传入当前事务对象 tx
    print("转账成功,事务已提交。")
    // 如果没有异常,打印成功信息

except (ValueError, TransactionError) as e:
    # 捕获我们自定义的 ValueError 或 py2neo 的 TransactionError
    print(f"转账失败,事务已回滚。原因: {
              e}")
    // 打印失败信息

# 验证最终结果
final_balances = graph.run("MATCH (a:Account) RETURN a.id AS id, a.balance AS balance").data()
print("
转账后的最终账户余额:")
for acc in final_balances:
    print(f"  账户 {
              acc['id']}: {
              acc['balance']}")
    // 打印每个账户的最终余额,以验证操作的原子性

with graph.begin() as tx: 的模式是 py2neo 中进行事务操作的黄金标准。它利用了 Python 的上下文管理器协议,极大地简化了代码,并保证了事务的完整性,避免了忘记 commitrollback 的情况。

访问图元数据:graph.schema

graph.schema 属性是一个强大的工具,用于查询数据库的“模式”信息,比如所有的标签、关系类型、索引和约束。

# 导入所需库
from py2neo import Graph

# 连接数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))

# 获取 schema 对象
schema = graph.schema
// schema 属性返回一个 Schema 对象,用于与数据库的元数据交互

# --- 查询标签和关系类型 ---
all_labels = schema.labels
// .labels 属性会查询并返回数据库中所有节点标签的 frozenset
all_rel_types = schema.relationship_types
// .relationship_types 属性会查询并返回所有关系类型的 frozenset

print(f"数据库中的所有标签: {
              all_labels}")
// 打印所有标签
print(f"数据库中的所有关系类型: {
              all_rel_types}")
// 打印所有关系类型

# --- 管理索引 ---
# 我们在之前的章节已经见过 schema.create_index 和 schema.drop_index
# 我们也可以查询已存在的索引
user_indexes = schema.get_indexes("User")
// get_indexes 方法接收一个标签名,返回该标签上所有索引的列表
print(f"
'User' 标签上的索引: {
              user_indexes}")
// 打印'User'标签的索引信息

# --- 管理约束 ---
# 约束比索引更强,它能保证数据的唯一性或存在性
# 创建唯一性约束:保证所有 User 节点的 id 属性是唯一的
try:
    schema.create_uniqueness_constraint("User", "id")
    // create_uniqueness_constraint 方法用于创建唯一性约束
    // 第一个参数是标签名,第二个参数是属性名
    print("为 (User.id) 创建了唯一性约束。")
except Exception as e:
    print(f"创建 (User.id) 唯一性约束失败或已存在: {
              e}")
    // 如果约束已存在,会抛出异常

# 创建节点属性存在性约束:保证所有 Post 节点都必须有 title 属性
try:
    # 假设我们有 Post 节点
    graph.run("CREATE (:Post {title:'a', content:'b'})")
    schema.create_node_property_existence_constraint("Post", "title")
    // create_node_property_existence_constraint 用于创建属性存在约束
    print("为 (Post.title) 创建了节点属性存在性约束。")
    
    # 下面的操作会失败,因为它违反了约束
    # graph.run("CREATE (:Post {content: 'some content without title'})")
    
except Exception as e:
    print(f"创建 (Post.title) 存在性约束失败或已存在: {
              e}")

# 查询所有约束
all_constraints = schema.get_constraints()
// get_constraints 方法返回数据库中所有的约束列表
print("
数据库中的所有约束:")
for constraint in all_constraints:
    print(f"  - {
              constraint}")
    // 打印每个约束的详细信息

# 删除约束(如果需要)
# schema.drop_uniqueness_constraint("User", "id")
# print("
已删除 (User.id) 的唯一性约束。")

使用 schema 对象来程序化地管理索引和约束,对于自动化部署、数据库迁移和确保数据一致性的应用至关重要。例如,在一个应用启动时,可以编写脚本检查并确保所有必要的索引和约束都已存在,如果不存在则自动创建。

**第二章:不仅仅是驱动:Py2neo 的架构哲学与对象模型 **

2.2 Node, Relationship, Path:图元素的 Python 化表达

py2neo 最核心的设计理念之一就是将 Neo4j 的基本图元素——节点(Node)、关系(Relationship)和路径(Path)——无缝地映射到 Python 的对象模型中。这使得开发者可以使用熟悉的面向对象编程范式来操作图数据,而不是仅仅停留在 Cypher 字符串拼接的层面。理解这些对象的内部结构和行为是掌握 py2neo 高级用法的关键。

2.2.1 Node 对象:标签、属性与唯一标识

Node 对象是 py2neo 中表示 Neo4j 节点的基石。它封装了节点的标签(Labels)和属性(Properties)。

构造 Node 对象:

创建 Node 对象有两种主要方式:

无状态创建(Unbound Node): 这种方式创建的 Node 对象尚未与数据库中的实际节点关联。它只存在于 Python 程序的内存中,通常用于构建新的节点,然后将其持久化到数据库中。

# 导入 Node 类
from py2neo import Node

# 方式一:直接指定标签和属性
# 创建一个 User 节点,并为其指定 name 和 age 属性
new_user = Node("User", name="Alice", age=30)
// Node 的第一个参数是标签,后续的关键字参数是属性
// 这个节点目前只存在于Python程序的内存中,尚未写入数据库

# 方式二:先创建,再添加标签和属性
# 创建一个空的节点对象
empty_node = Node()
// 此时它没有任何标签和属性

# 添加标签
empty_node.add_label("Product")
// 使用 add_label 方法添加标签
empty_node.add_label("Electronics")
// 一个节点可以有多个标签

# 添加属性(通过字典式访问)
empty_node["name"] = "Wireless Headphone"
// 通过方括号语法设置属性
empty_node["price"] = 199.99
// 可以设置不同类型的属性

# 也可以在创建时通过传入字典来设置属性
another_product = Node("Product", {
              "name": "Smart Watch", "sku": "SW001"})
// 第二个参数可以直接传入一个字典作为属性

print(f"新用户节点:标签={
                new_user.labels}, 属性={
                new_user.properties}")
// 打印 new_user 节点的标签和属性
print(f"空节点添加标签和属性后:标签={
                empty_node.labels}, 属性={
                empty_node.properties}")
// 打印 empty_node 的标签和属性
print(f"另一个产品节点:标签={
                another_product.labels}, 属性={
                another_product.properties}")
// 打印 another_product 节点的标签和属性

内部机制透视: 当你创建 Node 对象时,py2neo 在其内部维护一个字典来存储属性,以及一个集合(set)来存储标签。这些操作都是纯内存操作,不会与数据库进行任何交互。

绑定到数据库(Bound Node): 当从数据库中读取节点时,py2neo 会自动创建与数据库中真实节点关联的 Node 对象。这些对象具有一个 id 属性(即 Neo4j 内部的节点 ID),并且其状态与数据库中的节点同步。

# 导入 Graph, Node 类
from py2neo import Graph

# 连接到数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))

# 确保数据库中有一些数据用于查询
graph.run("MATCH (n) DETACH DELETE n") # 清理环境
graph.run("""
    CREATE (:Person {name: 'Bob', city: 'London'})
    // 创建一个名为 Bob 的 Person 节点
    CREATE (:Person {name: 'Charlie', city: 'Paris'})
    // 创建一个名为 Charlie 的 Person 节点
""")
print("数据已创建。")

# 从数据库中查询一个节点
# 使用 graph.nodes.match() 方法是 py2neo 推荐的、更 Pythonic 的查询方式
bob_node = graph.nodes.match("Person", name="Bob").first()
// .match("Person", name="Bob") 会查找标签为 "Person" 且 name 属性为 "Bob" 的节点
// .first() 返回匹配到的第一个节点对象,如果没有则返回 None

if bob_node:
    print(f"
从数据库中查询到的 Bob 节点:")
    print(f"  内部ID: {
                bob_node.identity}") # .identity 属性存储 Neo4j 内部的节点ID
    print(f"  标签: {
                bob_node.labels}")
    print(f"  属性: {
                bob_node.properties}")
    
    # 此时 bob_node 是一个绑定节点,它的状态与数据库中的节点一致
    # 修改它的属性,并使用 graph.push() 同步到数据库
    bob_node["age"] = 25 # 修改本地对象的属性
    graph.push(bob_node) # 将本地对象的修改推送到数据库
    print("
Bob 的年龄已更新到数据库。")
    
    # 重新查询 Bob 节点来验证更新
    updated_bob = graph.nodes.match("Person", name="Bob").first()
    if updated_bob:
        print(f"  重新查询后 Bob 的年龄: {
                updated_bob['age']}")
else:
    print("未找到 Bob 节点。")

内部机制透视: bound_node.identity 属性存储了 Neo4j 数据库中该节点的唯一 ID。当 py2neo 从数据库中获取数据并反序列化为 Node 对象时,它会将这个 ID 赋值给 identity。对于绑定节点,py2neo 提供了 graph.push()graph.pull() 方法来手动同步 Python 对象与数据库之间的状态。push() 将本地对象的更改写入数据库,pull() 从数据库读取最新状态更新本地对象。

Node 对象的方法和属性:

labels: 一个 frozenset 类型,包含节点的所有标签。frozenset 是不可变的,确保标签的集合性。
properties: 一个字典(dict),包含节点的所有属性。
identity: 节点的内部 Neo4j ID。只有绑定节点才有此属性。
add_label(label): 添加一个新标签。
remove_label(label): 移除一个标签。
clear_labels(): 移除所有标签。
update(properties_dict): 使用字典更新属性,可以添加新属性或覆盖现有属性。

# 导入 Node 类
from py2neo import Node

# 创建一个无状态节点
my_object = Node("Item", "Digital", name="Laptop", price=1200)

print(f"原始标签: {
              my_object.labels}") # 获取所有标签
print(f"原始属性: {
              my_object.properties}") # 获取所有属性

# 添加新标签
my_object.add_label("BestSeller")
print(f"添加 BestSeller 标签后: {
              my_object.labels}")

# 移除标签
my_object.remove_label("Digital")
print(f"移除 Digital 标签后: {
              my_object.labels}")

# 更新属性
my_object.update({
            "price": 1150, "color": "Silver"})
// 使用字典更新属性,如果键存在则覆盖,不存在则添加
print(f"更新属性后: {
              my_object.properties}")

# 获取单个属性
item_name = my_object["name"]
// 像字典一样通过键获取属性值
print(f"获取的商品名称: {
              item_name}")

# 删除属性
del my_object["color"]
// 像字典一样删除属性
print(f"删除 color 属性后: {
              my_object.properties}")

# 尝试获取不存在的属性会报错,需要用 .get() 或者检查
# print(my_object["non_existent_property"]) # 这会抛出 KeyError
print(f"尝试获取不存在的属性: {
              my_object.get('non_existent_property', 'Default Value')}")
// 使用 get 方法获取属性,如果不存在则返回默认值
2.2.2 Relationship 对象:类型、属性与方向

Relationship 对象代表 Neo4j 中的关系。它连接两个节点,有一个类型,并可以有属性。

构造 Relationship 对象:

Relationship 的构造函数需要三个核心参数:起始节点、关系类型、结束节点。

# 导入 Node, Relationship 类
from py2neo import Node, Relationship

# 创建两个独立的节点对象(无状态)
node_a = Node("Person", name="Alice")
node_b = Node("Person", name="Bob")

# 创建一个关系对象
# 关系类型为 "KNOWS",关系上的属性为 'since' 和 'weight'
rel_knows = Relationship(node_a, "KNOWS", node_b, since=2020, weight=0.8)
// Relationship(start_node, type_string, end_node, **properties)
// 第一个参数是起始节点
// 第二个参数是关系类型字符串
// 第三个参数是结束节点
// 之后的关键字参数是关系的属性

print(f"关系类型: {
              rel_knows.type}") # 获取关系类型
print(f"关系属性: {
              rel_knows.properties}") # 获取关系属性

# 访问关系的起点和终点
print(f"关系起点名称: {
              rel_knows.start_node['name']}")
// 访问关系的 start_node 属性,然后获取其 name 属性
print(f"关系终点名称: {
              rel_knows.end_node['name']}")
// 访问关系的 end_node 属性,然后获取其 name 属性

# 关系也可以通过传入字典设置属性
rel_bought = Relationship(node_a, "BOUGHT", node_b, {
            "quantity": 2, "date": "2023-01-15"})
// 使用字典设置关系属性

print(f"关系类型: {
              rel_bought.type}, 属性: {
              rel_bought.properties}")

绑定关系:graph.create()graph.merge()

与节点类似,创建 Relationship 对象后,需要通过 graph.create()graph.merge() 方法将其持久化到数据库。当从数据库中查询关系时,py2neo 也会返回绑定关系对象。

# 导入 Graph, Node, Relationship 类
from py2neo import Graph, Node, Relationship

# 连接数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))

# 清理环境
graph.run("MATCH (n) DETACH DELETE n")

# 创建并存储两个节点
alice = Node("Person", name="Alice")
bob = Node("Person", name="Bob")
graph.create(alice) # 确保节点已存在于数据库,因为关系需要它们的数据库ID
graph.create(bob)

# 创建并存储关系
# 注意:此时 alice 和 bob 已经是绑定节点了,有 .identity
friendship = Relationship(alice, "FRIENDS_WITH", bob, since="2022-01-01")
graph.create(friendship)
print("Alice 和 Bob 之间的 FRIENDS_WITH 关系已创建。")

# 查询这个关系
# 从数据库中获取绑定关系
queried_rel = graph.relationships.match((alice, bob), "FRIENDS_WITH").first()
// .match((start_node, end_node), type_string) 匹配指定起点、终点和类型的关系
// 这里的 alice 和 bob 必须是绑定节点

if queried_rel:
    print(f"
查询到的关系:")
    print(f"  关系ID: {
              queried_rel.identity}")
    // 绑定关系也有 identity 属性
    print(f"  类型: {
              queried_rel.type}")
    print(f"  属性: {
              queried_rel.properties}")
    print(f"  起点 (ID): {
              queried_rel.start_node.identity}, 姓名: {
              queried_rel.start_node['name']}")
    // 关系对象上的 start_node 和 end_node 属性会是绑定节点对象
    print(f"  终点 (ID): {
              queried_rel.end_node.identity}, 姓名: {
              queried_rel.end_node['name']}")
    
    # 修改关系属性并推送回数据库
    queried_rel["strength"] = 0.95
    // 修改关系上的属性
    graph.push(queried_rel)
    // 推送更改到数据库
    print("
关系属性 'strength' 已更新。")
else:
    print("未找到关系。")

关系对象的方法和属性:

type: 关系的类型字符串。
start_node: 关系的起始节点(一个 Node 对象)。
end_node: 关系的结束节点(一个 Node 对象)。
properties: 一个字典,包含关系的所有属性。
identity: 关系的内部 Neo4j ID。
set_type(new_type): 不推荐,关系类型在创建后通常不应更改。
update(properties_dict): 更新关系属性。

方向性与遍历:

py2neo 中,Relationship 对象总是从 start_node 指向 end_node。当你遍历图时,可以根据需要指定方向性或忽略方向。

# 导入 Graph, Node, Relationship 类
from py2neo import Graph, Node, Relationship

# 连接数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))
graph.run("MATCH (n) DETACH DELETE n")

# 创建一些节点和关系
person_a = Node("Person", name="A")
person_b = Node("Person", name="B")
person_c = Node("Person", name="C")
graph.create(person_a | person_b | person_c) # 使用 | 运算符可以创建多个节点

# A FOLLOWS B
graph.create(Relationship(person_a, "FOLLOWS", person_b))
# B LIKES C
graph.create(Relationship(person_b, "LIKES", person_c))
print("节点和关系已创建。")

# 查询 Person A 已关注了谁 (有方向性)
# 使用 graph.run() 执行 Cypher 查询
result_a_follows = graph.run("""
    MATCH (p:Person {name: 'A'})-[:FOLLOWS]->(followed_person)
    // 匹配从 Person A 出发,类型为 FOLLOWS 的关系,指向 followed_person
    RETURN followed_person.name AS followed_name
""").evaluate() # evaluate() 返回查询结果的第一个值

print(f"
Person A 已关注了: {
              result_a_follows}")

# 查询谁喜欢 Person C (反向查找)
result_who_likes_c = graph.run("""
    MATCH (liker_person)-[:LIKES]->(p:Person {name: 'C'})
    // 匹配指向 Person C,类型为 LIKES 的关系,从 liker_person 出发
    RETURN liker_person.name AS liker_name
""").evaluate()

print(f"谁喜欢 Person C: {
              result_who_likes_c}")

# 忽略方向性查找
# 查找 B 与谁有 LIKES 关系,不关心方向
result_b_liked_by_or_likes = graph.run("""
    MATCH (p:Person {name: 'B'})-[:LIKES]-(other_person)
    // 使用 -[:LIKES]- 忽略方向性
    RETURN other_person.name AS other_name
""").evaluate()

print(f"Person B 通过 LIKES 关系连接到的对象 (忽略方向): {
              result_b_liked_by_or_likes}")
2.2.3 Path 对象:序列化的图遍历结果

Path 对象是 py2neo 中表示图遍历结果的序列。它由一系列交替出现的节点和关系组成,形成一个完整的路径。

构造 Path 对象:

Path 对象通常不是直接手动构造的(尽管可以),而是通过 Cypher 查询返回路径结果时自动创建。

# 导入 Graph, Node, Relationship, Path 类
from py2neo import Graph, Node, Relationship, Path

# 连接数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))
graph.run("MATCH (n) DETACH DELETE n")

# 创建一个小的图用于演示路径
a = Node("City", name="A")
b = Node("City", name="B")
c = Node("City", name="C")
d = Node("City", name="D")

# 创建城市间的航线关系
r1 = Relationship(a, "FLIES_TO", b, distance=100)
r2 = Relationship(b, "FLIES_TO", c, distance=150)
r3 = Relationship(c, "FLIES_TO", d, distance=200)
r4 = Relationship(a, "DRIVES_TO", c, distance=250)

# 使用 | 操作符批量创建节点和关系
# 注意:在 py2neo 6.x 版本中,可以使用 `graph.create(a | b | c | d | r1 | r2 | r3 | r4)`
# 但在 py2neo 2021.x 版本中,建议分别创建或使用 graph.create(a, b, c, d), graph.create(r1, r2, r3, r4)
# 或者更简洁地,直接在 Cypher 中创建
graph.run("""
    CREATE (a:City {name: 'A'})-[:FLIES_TO {distance: 100}]->(b:City {name: 'B'})
    CREATE (b)-[:FLIES_TO {distance: 150}]->(c:City {name: 'C'})
    CREATE (c)-[:FLIES_TO {distance: 200}]->(d:City {name: 'D'})
    CREATE (a)-[:DRIVES_TO {distance: 250}]->(c)
""")
print("城市和航线数据已创建。")

# 查询从 A 到 D 的所有路径
# Cypher 的 RETURN path 关键字会返回一个 Path 对象
path_query = """
    MATCH path = (a:City {name: 'A'})-[*]->(d:City {name: 'D'})
    // 匹配从 City A 到 City D 的所有路径,[*]-> 表示任意类型、任意长度的有向路径
    RETURN path
    // 返回整个路径对象
"""
paths = graph.run(path_query).data()
// .data() 返回一个列表,每个元素是一个字典,包含查询结果

print("
从 A 到 D 的路径:")
for record in paths:
    path = record["path"] # 从结果字典中获取 Path 对象
    print(f"  路径长度 (跳数): {
              len(path)}") # len(path) 返回路径中的关系数量,即跳数
    print(f"  路径节点数量: {
              path.nodes.__len__()}") # path.nodes 返回路径中的所有节点,__len__() 是其数量
    
    # 遍历路径中的元素 (节点和关系交替)
    print("  路径详情:")
    for i, item in enumerate(path): # path 对象是可迭代的
        if i % 2 == 0: # 偶数索引是节点
            print(f"    节点: {
              item['name']} (ID: {
              item.identity})")
        else: # 奇数索引是关系
            print(f"    关系: -[:{
              item.type} {
             {distance: {
              item['distance']}}}]-> (ID: {
              item.identity})")
    print("-" * 20)

Path 对象的方法和属性:

nodes: 一个元组(tuple),包含路径中的所有节点对象。
relationships: 一个元组,包含路径中的所有关系对象。
start_node: 路径的起始节点。
end_node: 路径的结束节点。
__len__(): 返回路径中关系的跳数。
__iter__(): 使 Path 对象可迭代,按顺序返回节点和关系。
__getitem__(index): 通过索引访问路径中的节点或关系。

# 导入 Path 对象
from py2neo import Node, Relationship, Path

# 手动构建一个简单的路径(虽然不常见,但可以帮助理解内部结构)
node1 = Node("Test", name="Node1")
node2 = Node("Test", name="Node2")
node3 = Node("Test", name="Node3")

rel1 = Relationship(node1, "CONNECTS_TO", node2)
rel2 = Relationship(node2, "RELATES_TO", node3)

# Path 构造函数接收一个节点和关系交替的序列
manual_path = Path(node1, rel1, node2, rel2, node3)
// Path 的构造函数接受一个可变参数列表,依次是节点、关系、节点、关系...
// 必须以节点开始和结束,中间节点与关系交替出现

print(f"
手动构建的路径长度: {
              len(manual_path)}")
// 获取路径的跳数
print(f"手动构建的路径起始节点: {
              manual_path.start_node['name']}")
// 获取路径的起始节点名称
print(f"手动构建的路径结束节点: {
              manual_path.end_node['name']}")
// 获取路径的结束节点名称

print("
手动路径中的节点:")
for n in manual_path.nodes:
    print(f"  - {
              n['name']}")
    // 遍历路径中的所有节点并打印其名称

print("
手动路径中的关系:")
for r in manual_path.relationships:
    print(f"  - {
              r.type}")
    // 遍历路径中的所有关系并打印其类型

# 通过索引访问路径元素
# 0 是第一个节点,1 是第一个关系,2 是第二个节点,以此类推
print(f"
路径的第一个元素 (节点): {
              manual_path[0]['name']}")
// 访问路径中第一个元素,它是节点,并获取其名称
print(f"路径的第二个元素 (关系): {
              manual_path[1].type}")
// 访问路径中第二个元素,它是关系,并获取其类型

Path 对象的实用价值:

Path 对象在图分析中非常有用,尤其是当需要对找到的路径进行进一步处理时:

路径可视化: 可以将 Path 对象传递给图可视化库(如 pyvisnetworkx 结合绘图功能)进行渲染。
路径属性聚合: 轻松遍历路径上的所有节点和关系,计算总距离、总成本等聚合属性。
模式匹配验证: 获取路径后,可以进一步检查路径是否满足特定的业务规则(例如,路径上不能包含某个特定类型的节点或关系)。

通过将图元素封装为 Python 对象,py2neo 极大地提升了与 Neo4j 数据库交互的便利性和表达力,使得图数据的处理和分析在 Python 中变得更加自然和高效。

**第二章:不仅仅是驱动:Py2neo 的架构哲学与对象模型 **

2.3 graph.run() vs. 对象匹配器:命令式与声明式的二元选择

py2neo 提供了两种截然不同的方式来从数据库中查询数据:一种是直接执行 Cypher 查询字符串的 graph.run() 方法,另一种是更具 Python 风格的对象匹配器,如 graph.nodes.match()。这两种方法分别代表了命令式(Imperative)和声明式(Declarative)的查询哲学。选择哪一种取决于查询的复杂度、代码的可读性要求以及开发者的偏好。

2.3.1 graph.run():Cypher 的终极力量

graph.run(cypher_query, **parameters)py2neo 中最基础、最直接,也是最强大的查询接口。它接受一个 Cypher 查询字符串和一组参数,然后将它们发送到 Neo4j 服务器执行。

返回的对象:Cursor

graph.run() 的直接返回值是一个 Cursor 对象。这个 Cursor 对象是一个迭代器,它封装了从服务器流式传输回来的结果。它并不会一次性把所有结果都加载到内存中,这使得处理大规模结果集成为可能。

Cursor 对象提供了多种消费结果的方式:

data(): 这是最常用的方法。它会遍历整个结果集,并将每一行(Record)转换为一个 Python 字典,最后返回一个包含所有字典的列表。

# 导入 Graph 类
from py2neo import Graph

# 连接数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))
graph.run("MATCH (n) DETACH DELETE n")
graph.run("""
    CREATE (:Movie {title: 'The Matrix', released: 1999}),
           (:Movie {title: 'The Matrix Reloaded', released: 2003}),
           (:Movie {title: 'The Matrix Revolutions', released: 2003})
    // 创建三个电影节点
""")

# 使用 graph.run() 执行查询
cursor = graph.run("MATCH (m:Movie) WHERE m.released > 2000 RETURN m.title, m.released")
// 执行一个Cypher查询,返回2000年之后上映的电影的标题和年份

# 使用 data() 方法获取所有结果
results_list = cursor.data()
// .data() 会将游标中的所有记录耗尽,并返回一个字典列表

print("使用 .data() 获取的结果 (字典列表):")
for record in results_list:
    print(f"  - 标题: {
                record['m.title']}, 上映年份: {
                record['m.released']}")
    // 字典的键是 Cypher RETURN 子句中指定的别名

evaluate(): 当你确定查询只会返回单个值(一行一列)时,这个方法非常方便。它直接返回那个值,而不是一个列表或字典。

# 导入 Graph 类
from py2neo import Graph

# 连接数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))

# 查询电影的总数
movie_count = graph.run("MATCH (m:Movie) RETURN count(m)").evaluate()
// .evaluate() 直接返回 count(m) 的计算结果,即一个整数

print(f"
数据库中电影的总数是: {
                movie_count}")

# 如果查询返回多行或多列,evaluate() 只会返回第一行第一列的值
first_movie_title = graph.run("MATCH (m:Movie) RETURN m.title ORDER BY m.released").evaluate()
// 这个查询返回多个标题,但 .evaluate() 只会给你第一个
print(f"按上映年份排序的第一部电影是: {
                first_movie_title}")

直接迭代 Cursor: Cursor 对象本身是可迭代的。当处理非常大的结果集,不想一次性将所有数据加载到内存时,这是最高效的方式。

# 导入 Graph 类
from py2neo import Graph

# 连接数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))

# 直接迭代游标
print("
直接迭代 Cursor 对象:")
cursor = graph.run("MATCH (m:Movie) RETURN m.title AS title, m.released AS year")
// 再次执行查询

for record in cursor:
    # 每一条 record 是一个 py2neo.data.Record 对象,可以像字典一样访问
    print(f"  - 标题: {
                record['title']}, 上映年份: {
                record['year']}")
    // 像访问字典一样通过 RETURN 子句中的别名来访问数据
    # 也可以通过索引访问
    # print(f"  - 标题: {record[0]}, 上映年份: {record[1]}")

参数化:安全与性能的双重保障

绝对不要使用 Python 的字符串格式化(如 f-string 或 %)来构建 Cypher 查询。这会带来两大风险:

Cypher 注入攻击: 如果用户输入被直接拼接到查询字符串中,恶意用户可以构造特殊的输入来执行非预期的、破坏性的数据库操作。
性能下降: Neo4j 会缓存编译后的查询计划。如果每次查询的字符串都因参数不同而变化,Neo4j 将无法重用缓存的查询计划,每次都需要重新解析、编译查询,造成不必要的性能开销。

正确的做法是使用参数化查询。

# 导入 Graph 类
from py2neo import Graph

# 连接数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))

# --- 错误的做法 (危险且低效) ---
# user_input_title = "The Matrix' OR 1=1" # 一个恶意的输入示例
# bad_query = f"MATCH (m:Movie {
            {title: '{user_input_title}'}}) RETURN m"
# print(f"错误的查询语句: {bad_query}") # 千万不要在生产环境中这样做

# --- 正确的做法 (安全且高效) ---
def find_movie_by_title(title_param):
    """
    使用参数化查询安全地查找电影。
    """
    query = """
        MATCH (m:Movie {title: $title_to_find})
        // 在 Cypher 查询中使用一个占位符,例如 $title_to_find
        RETURN m
    """
    # 将参数作为一个字典传递给 graph.run()
    # py2neo 会安全地处理参数,将其与查询分开发送到服务器
    result = graph.run(query, title_to_find=title_param).data()
    // 'title_to_find' 是字典的键,对应 Cypher 中的占位符名称
    return result

# 正常查找
matrix_movie = find_movie_by_title("The Matrix")
if matrix_movie:
    # 返回的结果是 hydrated 的 Node 对象
    movie_node = matrix_movie[0]['m']
    print(f"
找到电影: {
              movie_node['title']}, 上映于 {
              movie_node['released']}")
else:
    print("
未找到电影 'The Matrix'")

# 即使有恶意输入,也是安全的
# 数据库会将整个 "The Matrix' OR 1=1" 作为一个普通字符串去查找,而不会找到任何匹配项
malicious_result = find_movie_by_title("The Matrix' OR 1=1")
if not malicious_result:
    print("恶意输入未能找到任何电影,查询是安全的。")

结果水合(Result Hydration)

py2neo 的一个强大之处在于它不仅仅返回原始数据。当 Cursor 返回结果时,py2neo 会检查返回数据的类型,并将其“水合”(Hydrate)成对应的 Python 对象。

返回节点 -> py2neo.data.Node 对象
返回关系 -> py2neo.data.Relationship 对象
返回路径 -> py2neo.data.Path 对象
返回列表 -> Python list
返回地图 -> Python dict
返回数值、字符串、布尔值 -> Python int, float, str, bool

这种自动类型转换使得后续处理变得非常直观和方便。

2.3.2 对象匹配器:声明式的简洁之美

对于简单的节点和关系查找操作,手写 MATCH...WHERE...RETURN 可能会显得有些繁琐。py2neo 提供了一套基于对象的匹配器 API,让这类操作更加简洁和 Pythonic。

graph.nodesNodeMatcher

graph.nodes 属性返回一个 NodeMatcher 对象。这个对象是查询节点的入口。

# 导入 Graph 类
from py2neo import Graph

# 连接数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))
# NodeMatcher 实例与一个 Graph 实例绑定
node_matcher = graph.nodes

# --- 基本用法 ---
# 匹配单个节点: .match().first()
# 相当于 Cypher: MATCH (m:Movie {title: 'The Matrix'}) RETURN m LIMIT 1
the_matrix = node_matcher.match("Movie", title="The Matrix").first()
// .match() 的第一个参数是标签,后续是属性的关键字参数
// .first() 返回第一个匹配项或 None

if the_matrix:
    print(f"使用 NodeMatcher 找到: {
              the_matrix['title']} (ID: {
              the_matrix.identity})")

# 匹配所有节点: .match() 返回一个可迭代的 NodeMatch 对象
# 相当于 Cypher: MATCH (m:Movie {released: 2003}) RETURN m
movies_2003 = node_matcher.match("Movie", released=2003)

print("
2003年上映的电影:")
for movie in movies_2003:
    # movies_2003 是一个迭代器,可以高效地遍历结果
    print(f"  - {
              movie['title']}")

# 获取列表: .all()
movies_2003_list = node_matcher.match("Movie", released=2003).all()
// .all() 将迭代器中的所有结果收集到一个列表中
print(f"
使用 .all() 获取的列表长度: {
              len(movies_2003_list)}")


# --- 高级匹配 ---
# 不等于: __ne
# 相当于 Cypher: MATCH (m:Movie) WHERE m.released <> 2003 RETURN m
other_movies = node_matcher.match("Movie").where("_.released <> 2003")
// .where() 方法接受一个 Cypher 条件表达式字符串
// 在表达式中,用下划线 '_' 代表正在匹配的节点
print("
非2003年上映的电影:")
for movie in other_movies:
    print(f"  - {
              movie['title']}")

# 大于/小于等:
# 相当于 Cypher: MATCH (m:Movie) WHERE m.released >= 2000 AND m.released < 2004
recent_movies = node_matcher.match("Movie").where("_.released >= 2000", "_.released < 2004")
// .where() 可以接受多个条件字符串,它们之间是 AND 的关系
print("
2000年到2004年之间的电影:")
for movie in recent_movies:
    print(f"  - {
              movie['title']}")

graph.relationshipsRelationshipMatcher

NodeMatcher 类似,graph.relationships 属性返回一个 RelationshipMatcher 对象,用于查找关系。

# 导入 Graph, Node, Relationship 类
from py2neo import Graph, Node, Relationship

# 连接数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))
graph.run("MATCH (n) DETACH DELETE n")

# 创建一些人和电影以及他们之间的关系
keanu = Node("Person", name="Keanu Reeves")
carrie = Node("Person", name="Carrie-Anne Moss")
matrix = Node("Movie", title="The Matrix")

graph.create(keanu | carrie | matrix) # 批量创建节点

# 创建关系
rel1 = Relationship(keanu, "ACTED_IN", matrix, role="Neo")
rel2 = Relationship(carrie, "ACTED_IN", matrix, role="Trinity")
graph.create(rel1 | rel2)
print("演员和电影关系已创建。")

# RelationshipMatcher 实例
rel_matcher = graph.relationships

# --- 基本用法 ---
# 匹配所有 ACTED_IN 关系
# 相当于 Cypher: MATCH ()-[r:ACTED_IN]->() RETURN r
acted_in_rels = rel_matcher.match(r_type="ACTED_IN")
// 使用 r_type 参数指定关系类型

print("
所有 ACTED_IN 关系:")
for rel in acted_in_rels:
    # rel 是一个 Relationship 对象
    start_person = rel.start_node['name']
    end_movie = rel.end_node['title']
    role = rel['role']
    print(f"  - {
              start_person} 在电影《{
              end_movie}》中扮演 {
              role}")

# 匹配特定节点之间的关系
# 注意: 传入的节点必须是绑定节点 (有 .identity)
# 相当于 Cypher: MATCH (p)-[r:ACTED_IN]->(m) WHERE id(p)=X AND id(m)=Y RETURN r
keanu_acts_in_matrix = rel_matcher.match(nodes=(keanu, matrix), r_type="ACTED_IN").first()
// nodes 参数是一个元组,包含起始节点和结束节点
// 顺序很重要,(keanu, matrix) 表示从 keanu 到 matrix 的关系

if keanu_acts_in_matrix:
    print(f"
Keanu Reeves 在 The Matrix 中的角色是: {
              keanu_acts_in_matrix['role']}")
2.3.3 何时使用何种方法?
场景 推荐方法 为什么
简单的 CRUD 操作 对象匹配器 语法简洁,代码更 Pythonic,不易出错。例如:根据 ID 获取用户,查找所有带特定标签的节点。
复杂的图模式匹配 graph.run() 匹配器无法表达复杂路径(如 (a)-[*1..5]->(b))、OPTIONAL MATCH、多 MATCH 子句等。
聚合、排序、分页 graph.run() 匹配器不支持 COUNT, AVG, ORDER BY, SKIP, LIMIT 等 Cypher 功能。
调用 GDS 或 APOC 程序 graph.run() 只有 graph.run() 才能执行 CALL 子句来调用数据库程序。
需要最高性能的批量操作 graph.run() 结合 UNWIND 的批量数据写入或更新,必须使用 graph.run()
动态构建查询 graph.run() (带参数) 虽然匹配器的 .where() 提供一些动态性,但对于需要动态添加 MATCH 子句或 RETURN 表达式的场景,使用 graph.run() 拼接 Cypher 字符串(并使用参数)更灵活。
代码可读性优先的简单查询 对象匹配器 对于不熟悉 Cypher 的团队成员,匹配器代码可能更容易理解。

内部机制揭秘:

重要的是要理解,对象匹配器只是 graph.run() 的一层语法糖。当你调用 node_matcher.match("Movie", title="The Matrix").first() 时,py2neo 在内部会生成一个类似 MATCH (a:Movie) WHERE a.title = $p1 RETURN a LIMIT 1 的 Cypher 查询,然后调用 graph.run() 并传入参数 { "p1": "The Matrix" }

理解这一点有助于破除对匹配器的神秘感,并认识到它的局限性——它的能力范围被严格限制在它能生成的那些简单的 Cypher 查询模式内。


2.4 Subgraph 与 OGM:超越单体对象的图结构抽象

py2neo 不仅提供了表示单个节点和关系的原子对象,还提供了更高层次的抽象来处理图的片段(Subgraph)和将图结构映射到 Python 类(OGM)。这些工具为在应用程序中管理和操作复杂的图数据提供了强大的支持。

2.4.1 Subgraph:内存中的图片段

Subgraph 对象是 py2neo 中表示一组节点和关系的集合。它是一个纯粹的客户端(Python 内存中)构造,可以看作是一个微型的、独立的图。

Subgraph 的构成:

一个 Subgraph 主要由两部分组成:

一个节点的集合(Set)
一个关系的集合(Set)

关系中的节点必须也存在于 Subgraph 的节点集合中,以保证其完整性。

# 导入所需类
from py2neo import Node, Relationship, Subgraph

# 创建一些独立的节点和关系对象
a = Node("LabelA", name="A")
b = Node("LabelB", name="B")
c = Node("LabelC", name="C")
ab = Relationship(a, "RELATES_TO", b)
bc = Relationship(b, "RELATES_TO", c)

# 创建一个 Subgraph 对象,包含 a, b 两个节点和它们之间的关系 ab
subgraph1 = Subgraph(nodes=[a, b], relationships=[ab])
// 构造函数接收一个节点列表和一个关系列表

# Subgraph 的节点和关系可以通过 .nodes 和 .relationships 属性访问
print(f"Subgraph1 的节点数: {
              len(subgraph1.nodes)}")
// .nodes 是一个 frozenset
print(f"Subgraph1 的关系数: {
              len(subgraph1.relationships)}")
// .relationships 是一个 frozenset

# Subgraph 支持 set 操作,这是其强大之处
# 创建第二个 Subgraph
subgraph2 = Subgraph(nodes=[b, c], relationships=[bc])
print(f"Subgraph2 的节点数: {
              len(subgraph2.nodes)}")

# 使用 | (union) 操作符合并两个 Subgraph
merged_subgraph = subgraph1 | subgraph2
// 合并后的 subgraph 包含 s1 和 s2 的所有节点和关系,并自动去重
print(f"
合并后的 Subgraph:")
print(f"  节点数: {
              len(merged_subgraph.nodes)}") # 结果应该是3 (A, B, C)
print(f"  关系数: {
              len(merged_subgraph.relationships)}") # 结果应该是2 (ab, bc)

# 使用 & (intersection) 操作符查找共同部分
intersection_subgraph = subgraph1 & subgraph2
// 交集 subgraph 只包含 s1 和 s2 中共有的节点和关系
print(f"
交集 Subgraph:")
# 节点 B 是共有的
print(f"  节点数: {
              len(intersection_subgraph.nodes)}") # 结果应该是1 (B)
for node in intersection_subgraph.nodes: print(f"    - {
              node['name']}")
# 没有共有的关系
print(f"  关系数: {
              len(intersection_subgraph.relationships)}") # 结果应该是0

# 使用 - (difference) 操作符
difference_subgraph = subgraph1 - subgraph2
// 差集 subgraph 包含在 s1 中但不在 s2 中的节点和关系
print(f"
差集 Subgraph (subgraph1 - subgraph2):")
print(f"  节点数: {
              len(difference_subgraph.nodes)}") # 结果应该是1 (A)
print(f"  关系数: {
              len(difference_subgraph.relationships)}") # 结果应该是1 (ab)

Subgraph 的应用场景:

构建和批量创建: 你可以在 Python 代码中以面向对象的方式构建一个复杂的图结构(一个 Subgraph),然后通过一次数据库调用将其全部创建。

# 导入 Graph 和其他所需类
from py2neo import Graph, Node, Relationship, Subgraph

# ... (前面的 a, b, c, ab, bc 定义) ...
# a, b, c, ab, bc 都是无状态的对象
complex_structure = Subgraph(nodes=[a, b, c], relationships=[ab, bc])

# 连接数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))
graph.run("MATCH (n) DETACH DELETE n")

# 在一次事务中创建整个 subgraph
# 这比逐个创建节点和关系要高效得多
tx = graph.begin()
tx.create(complex_structure)
tx.commit()

print("
使用 Subgraph 批量创建了 A -> B -> C 结构。")
node_count = graph.run("MATCH (n) RETURN count(n)").evaluate()
rel_count = graph.run("MATCH ()-[r]->() RETURN count(r)").evaluate()
print(f"数据库中现在有 {
                node_count} 个节点和 {
                rel_count} 个关系。")

从查询结果构建: 一个返回节点和关系的查询结果可以被“凝聚”(Coalesce)成一个 Subgraph 对象。

# 导入 Graph 类
from py2neo import Graph

# 连接数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))

# 查询一个图模式
results = graph.run("MATCH path = (p:Person)-[:ACTED_IN]->(m:Movie) RETURN path").data()
// 假设数据库中已有 Person 和 Movie 数据

if results:
    # graph.data.coalesce() 是一个关键函数
    # 它会检查输入的数据(通常是查询结果的列表),并从中提取所有的节点和关系,
    # 组装成一个单一的 Subgraph 对象,并处理掉所有的重复项。
    actor_movie_subgraph = graph.data.coalesce(results)
    // results 是一个字典列表,每个字典可能包含 'path',path里有node和rel
    
    print(f"
从查询结果凝聚成的 Subgraph:")
    print(f"  包含节点数: {
                len(actor_movie_subgraph.nodes)}")
    print(f"  包含关系数: {
                len(actor_movie_subgraph.relationships)}")

    # 现在你可以在客户端对这个 subgraph 进行分析、修改或与其他 subgraph 合并

客户端缓存: 对于应用中不经常变动但频繁访问的图部分(例如,网站的类别层级结构),可以查询一次,将其凝聚成 Subgraph 对象并缓存在内存中,后续的操作直接访问这个内存对象,避免了频繁的数据库查询。

2.4.2 OGM (Object-Graph Mapper):领域驱动设计的图实现

对象图映射(OGM)借鉴了关系型数据库世界中对象关系映射(ORM)的思想。它旨在将数据库中的图结构(带特定标签的节点、特定类型的关系)映射到 Python 的类和对象上,从而允许开发者以更贴近业务领域模型的语言来编写代码。

py2neo.ogm 模块提供了 OGM 功能。

定义模型:

OGM 的核心是定义模型类,这些类继承自 py2neo.ogm.GraphObject

# 导入 OGM 相关组件
from py2neo.ogm import GraphObject, Property, RelatedTo
from py2neo import Graph

# --- 定义模型类 ---

class Movie(GraphObject):
    # __primarykey__ 指定用于唯一标识此对象的属性。
    # 当调用 .match() 时,会使用这个键来生成 WHERE 子句。
    __primarykey__ = "title"

    # 使用 Property() 定义节点的属性
    title = Property()
    tagline = Property()
    released = Property()
    
    # 使用 RelatedTo 定义关系
    # 第一个参数是相关的模型类名 (字符串形式)
    # 第二个参数是关系的类型
    # 这定义了一个从 Person 到 Movie,类型为 ACTED_IN 的关系
    actors = RelatedTo("Person", "ACTED_IN")
    # 这定义了一个从 Person 到 Movie,类型为 DIRECTED 的关系
    directors = RelatedTo("Person", "DIRECTED")

class Person(GraphObject):
    __primarykey__ = "name"

    name = Property()
    born = Property()

    # 从 Person 的角度定义关系
    # RelatedFrom 定义了一个入向的关系
    # 从 Movie 到 Person,类型为 ACTED_IN
    acted_in = RelatedTo(Movie, "ACTED_IN")
    # 从 Movie 到 Person,类型为 DIRECTED
    directed = RelatedTo(Movie, "DIRECTED")
    
print("OGM 模型 Movie 和 Person 已定义。")

使用 OGM 进行 CRUD 操作:

# 连接数据库
graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))
graph.run("MATCH (n) DETACH DELETE n")

# --- 创建对象 ---
# 实例化 OGM 对象,就像普通的 Python 对象一样
keanu = Person()
keanu.name = "Keanu Reeves"
keanu.born = 1964

lana = Person()
lana.name = "Lana Wachowski"
lana.born = 1965

matrix = Movie()
matrix.title = "The Matrix"
matrix.released = 1999

# --- 创建关系 ---
# 使用 .add() 方法在 RelatedTo 集合上添加关系
# 关系本身也可以有属性
matrix.actors.add(keanu, role="Neo")
matrix.directors.add(lana)

# --- 保存到数据库 ---
# 使用 graph.push() 将 OGM 对象及其关系推送到数据库
# py2neo 会自动生成相应的 CREATE 或 MERGE Cypher 语句
graph.push(matrix)
print("使用 OGM 创建了 Keanu, Lana 和 The Matrix,以及它们之间的关系。")

# --- 查询对象 ---
# 使用模型的 .match() 方法进行查询
# 它会使用 __primarykey__ 来查找
retrieved_matrix = Movie.match(graph, "The Matrix").first()
// Movie.match(graph, 'The Matrix') 相当于 graph.nodes.match('Movie', title='The Matrix')

if retrieved_matrix:
    print(f"
查询到的电影: {
              retrieved_matrix.title} (发行年份: {
              retrieved_matrix.released})")
    
    # 访问关系就像访问对象的属性一样
    print("  导演:")
    for director in retrieved_matrix.directors:
        print(f"    - {
              director.name} (生于: {
              director.born})")
        // director 是一个 Person OGM 对象

    print("  演员:")
    # .relations(actor) 可以获取关系上的属性
    for actor in retrieved_matrix.actors:
        role = retrieved_matrix.actors.get(actor, "role")
        // .get(object, property_name) 获取关系上的属性
        print(f"    - {
              actor.name} 扮演 {
              role}")

# --- 更新对象 ---
keanu.born = 1965 # 假设我们搞错了年份
graph.push(keanu) # 再次 push 会更新已有节点
print("
Keanu Reeves 的出生年份已更新。")

# --- 删除对象 ---
# graph.delete(keanu) # 这会删除 Keanu 节点,但如果它还有关系,可能会报错
# graph.separate() 可以先删除关系
graph.separate(matrix.actors) # 删除所有 ACTED_IN 关系
graph.delete(matrix) # 然后删除电影节点
print("已删除电影及其关系。")

OGM 的优缺点:

优点:

高层抽象: 代码更贴近业务领域,可读性强。
减少模板代码: 无需为简单的 CRUD 操作编写 Cypher。
领域驱动设计: 非常适合将应用程序的领域模型直接映射到图上。

缺点:

性能黑盒: OGM 生成的 Cypher 查询可能不是最优的。对于性能敏感的复杂查询,手写 Cypher 更好。
灵活性受限: OGM 很难表达复杂的图算法、多跳路径查询或 APOC/GDS 程序调用。
学习曲线: 需要学习 OGM 自身的 API 和设计模式。
“阻抗失配”: 当图模型的结构与面向对象的类结构不完全匹配时,OGM 会变得很笨拙。

OGM 是一个强大的工具,但不是万能药。它最适合用在应用程序中代表核心业务实体(用户、产品、订单等)的部分,而将复杂的分析和遍历任务交给原生的 graph.run() 和 Cypher。

第三部分:高级数据操作与性能优化

第三章:批量操作的艺术:UNWIND 与事务批处理

3.1 UNWIND 的魔力:从 Python 列表到图结构的高效转换

UNWIND 是 Cypher 中一个极其强大的子句。它的核心功能是将一个列表(List)展开,为列表中的每一个元素生成一行记录。这就像是在 Cypher 内部进行了一次 for 循环,从而使得我们能够将一批数据作为一个参数传递给查询,然后在数据库端进行迭代处理。

UNWIND 的基本语法:

UNWIND [1, 2, 3, 4] AS number
RETURN number

这个查询会返回四行,每行一个数字。当我们把 Python 中的数据列表作为参数传入时,UNWIND 就成为了批量操作的基石。

应用场景一:批量创建节点

假设我们有一个 Python 列表,其中包含数百个新用户的信息,每个用户信息是一个字典。

天真的(Naive)方法(性能极差):

import time
from py2neo import Graph

graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))
graph.run("MATCH (n:User) DETACH DELETE n") # 清理环境

# 准备一批用户数据
users_to_create = [{
            "id": i, "name": f"User_{
              i}", "region": "region_" + str(i % 5)} for i in range(1000)]
// 生成一个包含1000个用户字典的列表

# 错误的方式:在 Python for 循环中逐条创建
print("开始使用逐条创建的方式...")
start_time = time.time()
for user_data in users_to_create:
    graph.create(Node("User", **user_data))
    // 对于列表中的每个用户,都执行一次 graph.create()
    // 这意味着1000次网络往返和1000个独立的事务
end_time = time.time()
print(f"逐条创建1000个节点耗时: {
              end_time - start_time:.4f} 秒")
// 打印耗时

高效的 UNWIND 方法:

import time
from py2neo import Graph

graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))
graph.run("MATCH (n:User) DETACH DELETE n") # 清理环境

users_to_create = [{
            "id": i, "name": f"User_{
              i}", "region": "region_" + str(i % 5)} for i in range(1000)]
// 同样准备1000个用户的数据

# 正确的方式:使用 UNWIND
print("
开始使用 UNWIND 批量创建...")
# 定义批量创建的 Cypher 查询
# 这个查询只会被发送到数据库一次
unwind_query = """
    UNWIND $users AS user_properties
    // UNWIND 将传入的 $users 列表(参数名)展开,每一项命名为 user_properties
    CREATE (u:User)
    // 为列表中的每一项创建一个新的 User 节点
    SET u = user_properties
    // 使用 SET 将该项(一个字典)中的所有键值对设置为节点的属性
"""
start_time = time.time()
# 将整个列表作为单个参数传递
graph.run(unwind_query, users=users_to_create)
// 整个操作只有一次网络往返和一个事务
end_time = time.time()
print(f"使用 UNWIND 批量创建1000个节点耗时: {
              end_time - start_time:.4f} 秒")
// 打印耗时,你会发现它比前一个方法快几个数量级

# 验证结果
user_count = graph.run("MATCH (u:User) RETURN count(u)").evaluate()
print(f"数据库中现在有 {
              user_count} 个用户。")

性能对比分析
在我的本地机器上,逐条创建可能需要 5-10 秒,而 UNWIND 方法通常在 0.1 秒内完成。性能差异是 50-100 倍。数据量越大,这种差异越悬殊。原因在于:

网络开销:前者有 1000 次 TCP/IP 往返,后者只有 1 次。网络延迟是主要瓶颈。
事务开销:前者有 1000 次事务的开启、提交和日志记录,后者只有 1 次。
查询解析:前者中 py2neo 每次都要生成 Cypher,数据库每次都要解析;后者数据库只需解析一次查询计划,然后高效地执行 1000 次数据插入。

应用场景二:批量创建关系

这是更常见也更复杂的场景。通常我们有一批关系数据,需要先找到关系的起点和终点,然后创建它们之间的关系。

假设我们有一批交易数据,格式为 (付款用户ID, 收款用户ID, 金额)

import time
from py2neo import Graph

graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))

# 确保我们有用户节点,并且在 id 属性上有唯一性约束和索引,这对于快速 MATCH 至关重要
try:
    graph.schema.create_uniqueness_constraint("User", "id")
    print("为 (User.id) 创建了唯一性约束。")
except Exception:
    print("(User.id) 的唯一性约束已存在。")

# 准备一批交易数据
# 每个字典代表一笔交易
transactions = [
    {
            "from_id": 10, "to_id": 20, "amount": 99.9},
    {
            "from_id": 20, "to_id": 30, "amount": 150.0},
    {
            "from_id": 10, "to_id": 40, "amount": 25.5},
    # ... 假设这里有成千上万条交易
]
# 为了演示,我们先创建一些用户
graph.run("""
    UNWIND range(0, 100) AS i
    MERGE (:User {id: i})
""")

# 使用 UNWIND 批量创建关系
print("开始使用 UNWIND 批量创建关系...")
# 批量创建关系的 Cypher 查询
# 这个查询结合了 UNWIND, MATCH 和 MERGE
batch_rel_creation_query = """
    UNWIND $tx_list AS tx // 将交易列表展开,每一项命名为 tx
    MATCH (payer:User {id: tx.from_id}) // 找到付款方用户 (得益于 User.id 上的索引,这步很快)
    MATCH (recipient:User {id: tx.to_id}) // 找到收款方用户
    MERGE (payer)-[r:PAID_TO]->(recipient) // 使用 MERGE 创建关系,如果关系已存在则不做任何事
    ON CREATE SET r.amount = tx.amount, r.timestamp = datetime() // 如果关系是新创建的,设置属性
"""
# 使用 MERGE 而不是 CREATE 是一个好习惯,可以防止意外创建重复的关系

start_time = time.time()
graph.run(batch_rel_creation_query, tx_list=transactions)
// 将整个交易列表作为参数传入
end_time = time.time()

print(f"使用 UNWIND 批量创建 {
              len(transactions)} 条关系耗时: {
              end_time - start_time:.4f} 秒")

# 验证结果
rel_count = graph.run("MATCH ()-[:PAID_TO]->() RETURN count(*)").evaluate()
print(f"数据库中现在有 {
              rel_count} 条 PAID_TO 关系。")

这个模式是高性能图数据导入的核心。关键在于:将数据作为参数传递,让数据库自己去循环和匹配。前提是用于 MATCH 的属性(如 User.id)必须有索引,否则数据库端的 MATCH 会退化成全表扫描,UNWIND 带来的优势将被抵消。

应用场景三:批量更新属性

假设我们需要根据一份外部数据,更新数据库中一批产品的库存和价格。

import time
from py2neo import Graph

graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))
graph.run("MATCH (n:Product) DETACH DELETE n")

# 准备一些产品数据
graph.run("""
    UNWIND range(1, 100) AS i
    CREATE (:Product {sku: 'SKU-' + i, stock: 100, price: 10.0})
""")
try:
    graph.schema.create_uniqueness_constraint("Product", "sku")
    print("为 (Product.sku) 创建了唯一性约束。")
except Exception:
    print("(Product.sku) 的唯一性约束已存在。")

# 准备要更新的数据
# 每个字典包含产品的唯一标识符(sku)和要更新的属性
updates = [
    {
            "sku": "SKU-5", "new_stock": 50, "new_price": 12.5},
    {
            "sku": "SKU-18", "new_stock": 0}, # 价格不变
    {
            "sku": "SKU-33", "new_price": 9.5}, # 库存不变
    # ... 假设有很多更新
]

# 使用 UNWIND 批量更新
print("开始使用 UNWIND 批量更新...")
batch_update_query = """
    UNWIND $update_list AS update_data // 展开更新数据列表
    MATCH (p:Product {sku: update_data.sku}) // 匹配到要更新的产品
    SET p.stock = COALESCE(update_data.new_stock, p.stock), // 使用 COALESCE 函数,如果 update_data.new_stock 存在,则使用它,否则使用原有的 p.stock 值
        p.price = COALESCE(update_data.new_price, p.price) // 这样可以灵活地只更新部分属性
"""

start_time = time.time()
graph.run(batch_update_query, update_list=updates)
end_time = time.time()

print(f"使用 UNWIND 批量更新 {
              len(updates)} 个产品耗时: {
              end_time - start_time:.4f} 秒")

# 验证更新
updated_product_5 = graph.nodes.match("Product", sku="SKU-5").first()
print(f"SKU-5 的新库存: {
              updated_product_5['stock']}, 新价格: {
              updated_product_5['price']}")
updated_product_18 = graph.nodes.match("Product", sku="SKU-18").first()
print(f"SKU-18 的新库存: {
              updated_product_18['stock']}, 价格: {
              updated_product_18['price']}")

COALESCE 函数在这里是点睛之笔。它返回参数列表中的第一个非 null 值。这使得我们的更新数据可以很灵活,有的只包含新库存,有的只包含新价格,而查询依然能正确处理。

3.2 事务批处理:控制大型导入的粒度

UNWIND 非常适合处理几千到几万条记录的批量操作。但是,当面对百万级甚至千万级的海量数据导入时,如果试图将所有数据加载到一个 Python 列表中,然后通过一次 UNWIND 调用塞进一个巨大的事务里,灾难就会发生。

单一巨型事务的问题:

客户端内存爆炸: 在 Python 中创建一个包含数百万个字典的列表会消耗巨量的内存。
服务器内存爆炸: Neo4j 在处理事务时,会在内存中保留事务状态(Transaction State)。一个修改了数百万个实体的大型事务会占用大量堆内存(Heap Memory),极易导致数据库因 OutOfMemoryError 而崩溃。
网络传输问题: 将一个巨大的参数(可能几百MB甚至上GB)通过 Bolt 协议发送也可能导致超时或失败。

解决方案:分批提交(Periodic Commits)
正确的做法是将整个数据集分割成合理大小的“块”(Chunks)或“批次”(Batches),然后对每个批次执行一次 UNWIND 查询,并将其包裹在一个独立的事务中提交。

Python 客户端实现批处理:

import time
from py2neo import Graph

graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))
graph.run("MATCH (n:LogEntry) DETACH DELETE n")

# 假设我们有一个非常大的日志数据生成器,而不是一个列表
def generate_log_entries(count):
    for i in range(count):
        yield {
            "event_id": f"evt-{
              i}", "level": "INFO", "timestamp": time.time()}

# 定义批处理导入函数
def batch_import_logs(log_generator, batch_size=5000):
    """
    一个通用的批处理导入函数。
    :param log_generator: 一个生成数据字典的迭代器或生成器。
    :param batch_size: 每个批次的大小。
    """
    total_imported = 0
    batch = [] # 用于存放当前批次数据的列表
    
    # 定义要重复执行的 UNWIND 查询
    import_query = """
        UNWIND $batch_data AS log
        CREATE (:LogEntry {
            event_id: log.event_id, 
            level: log.level, 
            timestamp: log.timestamp
        })
    """
    
    print(f"开始批处理导入,批次大小为 {
              batch_size}...")
    start_time = time.time()
    
    for log_entry in log_generator:
        batch.append(log_entry) // 将数据项添加到当前批次
        if len(batch) >= batch_size: // 当批次达到设定大小时
            # 使用显式事务来提交这个批次
            with graph.begin() as tx:
                tx.run(import_query, batch_data=batch)
                // 执行 UNWIND 查询
            total_imported += len(batch)
            print(f"已提交 {
              total_imported} 条记录...")
            batch.clear() // 清空批次列表,准备下一个批次

    # 处理最后一个不满 batch_size 的批次
    if batch: // 如果循环结束后 batch 中仍有数据
        with graph.begin() as tx:
            tx.run(import_query, batch_data=batch)
        total_imported += len(batch)
        print(f"已提交最后的 {
              len(batch)} 条记录...")
        batch.clear()
        
    end_time = time.time()
    print(f"
批处理导入完成。共导入 {
              total_imported} 条记录,总耗时: {
              end_time - start_time:.4f} 秒。")

# --- 执行导入 ---
# 我们来导入 100,000 条日志记录
log_count = 100000
batch_import_logs(generate_log_entries(log_count), batch_size=10000)
// 使用一个生成器来避免一次性在内存中创建所有数据

# 验证
final_count = graph.run("MATCH (l:LogEntry) RETURN count(l)").evaluate()
print(f"数据库中最终的 LogEntry 节点数: {
              final_count}")

这个 batch_import_logs 函数是可重用的,并且内存效率极高。它只在内存中保留一个批次的数据。with graph.begin() as tx: 确保了每个批次都在自己的事务中原子地提交。

服务端批处理:apoc.periodic.iterate (终极方案)

对于真正海量(亿级)的数据导入,即使是客户端批处理,大量的网络往返(虽然比逐条好,但批次多了往返也多)依然会成为瓶颈。APOC(Awesome Procedures On Cypher)库提供了一个完美的服务器端解决方案:apoc.periodic.iterate

它的工作方式是:

你提供第一个 Cypher 查询,它负责生成一个数据流
你提供第二个 Cypher 查询,它会对数据流中的每一个元素执行操作
APOC 库在后台自动处理批处理和提交,无需客户端干预。

这几乎将所有逻辑都移到了数据库服务器上,客户端只需发起一次调用,极大地减少了网络流量和客户端的复杂性。

from py2neo import Graph

graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))
graph.run("MATCH (n:SensorReading) DETACH DELETE n")

# 准备一批数据,这次我们仍然在客户端准备,但一次性发送给 APOC
sensor_data = [{
            "sensor_id": f"s-{
              i//10}", "value": i*0.1, "time": i} for i in range(100000)]
// 准备 100,000 条传感器读数

print("使用 apoc.periodic.iterate 进行服务器端批处理导入...")

# apoc.periodic.iterate 的 Cypher 查询
# 注意它的结构
apoc_query = """
    CALL apoc.periodic.iterate(
        'UNWIND $readings AS reading RETURN reading', // 第一个查询:从参数中 UNWIND 数据流
        'CREATE (:SensorReading {sensor_id: reading.sensor_id, value: reading.value, time: reading.time})', // 第二个查询:对流中每项执行的操作
        {batchSize: 10000, parallel: true} // 配置对象:批次大小和是否并行执行
    )
    YIELD batches, total, timeTaken
    RETURN batches, total, timeTaken
"""
# 'UNWIND $readings AS reading RETURN reading' 是一个迭代器语句,它从我们传入的 $readings 参数中产生数据行。
# 'CREATE ...' 是动作语句,它会在每一行上执行。
# {batchSize: 10000, parallel: true} 是一个配置 map。
#   - batchSize: APOC 内部每处理多少条记录就提交一次事务。
#   - parallel: 是否尝试并行执行(对于独立的 CREATE 操作是安全的)。

# 从 py2neo 调用它
# 客户端只需要发送一次请求
result = graph.run(apoc_query, readings=sensor_data).data()
// 将整个数据列表作为参数 readings 传入

print("服务器端批处理完成。")
print(f"  - APOC 报告的分批数: {
              result[0]['batches']}")
print(f"  - APOC 报告的总处理数: {
              result[0]['total']}")
print(f"  - APOC 报告的耗时(秒): {
              result[0]['timeTaken']}")

# 验证
final_count = graph.run("MATCH (s:SensorReading) RETURN count(s)").evaluate()
print(f"数据库中最终的 SensorReading 节点数: {
              final_count}")

选择批处理策略:

< 1,000 条记录: 简单的 graph.creategraph.mergefor 循环中可能可以接受,但 UNWIND 依然是更好的习惯。
1,000 到 50,000 条记录: 单次 UNWIND 查询是最佳选择。
50,000 到数百万条记录: 使用客户端批处理(Python for 循环 + UNWIND + 事务)是健壮且高效的选择。
数百万到亿级记录: apoc.periodic.iterate 是无可争议的王者。它将性能和稳定性提升到极致。

选择合适的批次大小(batch_size
这是一个需要权衡的参数,没有万能的答案。

太小(如 100):事务开销和网络延迟会重新成为瓶颈。
太大(如 1,000,000):会导致服务器内存压力过大。
一个好的起点是 1,00050,000 之间。最佳值取决于:
节点/关系的复杂度:属性多、字符串长的实体,批次应该小一些。
服务器配置:堆内存(Heap Size)越大的服务器,可以承受越大的批次。
操作类型:简单的 CREATE 比复杂的 MERGEON CREATE SET 对内存的要求低。
最好的方法是在接近生产的环境中,用不同的批次大小进行测试,观察服务器的内存使用情况和导入速度,找到那个“甜点”(sweet spot)。

**第三章:批量操作的艺术:UNWIND 与事务批处理 **

3.3 MERGE 的精妙之处:幂等性、条件逻辑与陷阱规避

在图数据库的操作中,除了纯粹的创建(CREATE)和读取(MATCH),我们更常面对的场景是“如果不存在则创建,如果存在则更新”,也就是所谓的“Upsert”操作。Cypher 中的 MERGE 子句正是为这种幂等性(Idempotent)操作而设计的。幂等性意味着无论一个操作执行一次还是多次,产生的结果都是相同的。这对于构建健壮、可重试的数据管道至关重要。然而,MERGE 也是一个双刃剑,如果不深入理解其工作机制,很容易掉入陷阱,导致意外的数据重复或性能问题。

CREATE vs. MERGE 的根本区别:

CREATE: 总是创建新的节点或关系。如果你执行 CREATE (:Person {name: 'Alice'}) 两次,数据库中就会有两个独立的、都叫 ‘Alice’ 的 Person 节点。
MERGE: 寻找一个与指定模式完全匹配的现有实体。

如果找到了,MERGE 就会绑定到那个已存在的实体上。
如果没找到MERGE 就会创建一个新的实体来匹配该模式。

MERGE 的完整语法剖析:

MERGE 的真正威力体现在它的两个可选子句上:ON CREATEON MATCH

MERGE (pattern)
  ON CREATE SET ...  -- 当模式被新创建时执行
  ON MATCH SET ...   -- 当模式匹配到已有实体时执行

这使得我们可以在一个原子操作内实现复杂的条件逻辑。

应用场景一:幂等的用户配置(Profile)管理

这是一个经典的 Upsert 场景。当一个用户登录或更新信息时,我们需要一个查询来处理:

如果用户是第一次出现,创建他/她的节点并设置初始属性(如注册时间)。
如果用户已存在,更新他/她的属性(如最后登录时间)。

from py2neo import Graph
from datetime import datetime

graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))
graph.run("MATCH (n:User) DETACH DELETE n")
try:
    graph.schema.create_uniqueness_constraint("User", "email")
    print("为 (User.email) 创建了唯一性约束。")
except Exception:
    print("(User.email) 的唯一性约束已存在。")

def upsert_user(user_email, user_name, region_info):
    """
    一个幂等的函数,用于创建或更新用户信息。
    """
    upsert_query = """
        // MERGE子句会基于 User 标签和 email 属性去寻找节点
        // 因为 email 上有唯一性约束,所以这个查找非常快,并且保证最多只会找到一个
        MERGE (u:User {email: $email}) 
        
        // ON CREATE 子句只会在节点是新创建的时候执行
        ON CREATE SET 
            u.name = $name,
            u.region = $region,
            u.created_at = datetime(), // 记录创建时间
            u.last_seen = datetime()
            
        // ON MATCH 子句只会在节点是匹配到已存在节点的时候执行
        ON MATCH SET 
            u.name = $name, // 也许用户的名字会变
            u.region = $region,
            u.last_seen = datetime() // 更新最后访问时间
            
        RETURN u.created_at AS created, u.last_seen AS seen
        // 返回时间戳以供验证
    """
    
    params = {
            
        "email": user_email,
        "name": user_name,
        "region": region_info
    }
    
    return graph.run(upsert_query, **params).data()[0]

# --- 第一次调用:创建用户 ---
print("--- 第一次调用 (创建) ---")
result1 = upsert_user("alice@example.com", "Alice W.", "EU")
# 将创建时间戳和最后访问时间戳转换为 Python datetime 对象以便比较
created_time_1 = result1['created'].to_native()
seen_time_1 = result1['seen'].to_native()
print(f"用户 Alice 创建于: {
              created_time_1}")
print(f"用户 Alice 最后访问于: {
              seen_time_1}")
# 此时,两个时间戳应该几乎完全相同

# --- 第二次调用:匹配并更新用户 ---
import time
time.sleep(1) # 等待一秒钟,以确保时间戳有明显变化

print("
--- 第二次调用 (匹配和更新) ---")
result2 = upsert_user("alice@example.com", "Alice Wonderland", "US") # 名字和地区都变了
created_time_2 = result2['created'].to_native()
seen_time_2 = result2['seen'].to_native()
print(f"用户 Alice 创建于: {
              created_time_2}") // 这个时间应该和第一次的创建时间相同
print(f"用户 Alice 最后访问于: {
              seen_time_2}") // 这个时间应该是新的,比第一次的访问时间晚
print(f"创建时间是否未变: {
              created_time_1 == created_time_2}")
print(f"访问时间是否已更新: {
              seen_time_1 < seen_time_2}")

# 验证数据库中的最终状态
final_alice = graph.nodes.match("User", email="alice@example.com").first()
print(f"
数据库中最终状态: 姓名='{
              final_alice['name']}', 地区='{
              final_alice['region']}'")
// 姓名和地区都应该是第二次调用时的值

这个单一的、原子的 MERGE 查询优雅地处理了两种情况,是后端服务中处理实体更新的理想模式。

MERGE 的陷阱:对完整模式的严格匹配

新手最容易犯的错误是直接 MERGE 一个包含未绑定变量的完整路径。

错误的模式:
MERGE (u:User {name: 'Alice'})-[:KNOWS]->(b:User {name: 'Bob'})

这个查询的语义是:“寻找一个Alice节点,它通过KNOWS关系连接到一个Bob节点。如果这个完整的模式不存在,就创建它”。

这会导致什么问题?假设数据库中已经有了 ‘Alice’ 和 ‘Bob’ 两个节点,但它们之间没有 KNOWS 关系。MERGE 会因为找不到完整的模式而重新创建 ‘Alice’ 和 ‘Bob’ 节点,以及它们之间的关系。最终你数据库里就会有两个 ‘Alice’ 和两个 ‘Bob’,这几乎肯定不是你想要的。

正确的模式:MATCH 然后 MERGE

正确的做法是先将模式中已经存在的实体(通常是节点)MATCH 出来,然后再 MERGE 它们之间的关系。

from py2neo import Graph, Node

graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))
graph.run("MATCH (n) DETACH DELETE n")

# 先确保节点存在
graph.create(Node("User", name="Alice"))
graph.create(Node("User", name="Bob"))

# 正确的创建关系的方式
correct_rel_creation_query = """
    MATCH (u1:User {name: 'Alice'}) // 首先,明确地找到 Alice 节点
    MATCH (u2:User {name: 'Bob'})   // 其次,明确地找到 Bob 节点
    MERGE (u1)-[r:KNOWS]->(u2) // 然后,只对它们之间的关系进行 MERGE
    ON CREATE SET r.since = datetime()
    RETURN id(u1), id(u2) // 返回 ID 以验证我们操作的是已存在的节点
"""
print("--- 使用 MATCH then MERGE ---")
result = graph.run(correct_rel_creation_query).data()
print("第一次执行,关系被创建。")
original_ids = (result[0]['id(u1)'], result[0]['id(u2)'])
print(f"操作的节点ID: {
              original_ids}")

# 再次执行相同的查询
result2 = graph.run(correct_rel_creation_query).data()
print("
第二次执行,关系被匹配,没有创建新节点或关系。")
new_ids = (result2[0]['id(u1)'], result2[0]['id(u2)'])
print(f"操作的节点ID: {
              new_ids}")
print(f"节点ID是否保持不变: {
              original_ids == new_ids}")

# 检查节点总数
user_count = graph.run("MATCH (n:User) RETURN count(n)").evaluate()
print(f"最终 User 节点总数: {
              user_count}") // 结果应该是 2,而不是 4

这个 MATCH ... MATCH ... MERGE ... 的模式是创建实体间关系的黄金法则。它保证了你总是在操作已知的、正确的实体,而不会意外地复制数据。

结合 UNWIND 进行批量 Upsert

现在,我们可以将 UNWIND 的批量处理能力和 MERGE 的幂等性结合起来,创造出终极的批量数据同步工具。

假设我们从一个外部系统接收到一个产品信息流,需要同步到 Neo4j 数据库。

import time
from py2neo import Graph

graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))
graph.run("MATCH (n:Product) DETACH DELETE n")
try:
    graph.schema.create_uniqueness_constraint("Product", "sku")
    print("为 (Product.sku) 创建了唯一性约束。")
except Exception:
    print("(Product.sku) 的唯一性约束已存在。")

# 准备一批产品更新数据
# 有些是新产品,有些是已有产品的价格更新
product_feed = [
    {
            "sku": "PROD-001", "name": "Laptop Pro", "price": 1299.99, "category": "Electronics"}, # 新产品
    {
            "sku": "PROD-002", "name": "Wireless Mouse", "price": 49.99, "category": "Accessories"}, # 新产品
    {
            "sku": "PROD-001", "price": 1249.99}, # PROD-001 降价了
]

# 先手动创建 PROD-001,模拟它已经存在的情况
graph.run("CREATE (:Product {sku: 'PROD-001', name: 'Laptop Pro', price: 1399.99, category: 'Electronics'})")
print("数据库中已预先存在一个 PROD-001 产品,价格为 1399.99。")

# 定义批量 Upsert 查询
batch_upsert_query = """
    UNWIND $feed AS product_data // 展开产品数据流
    MERGE (p:Product {sku: product_data.sku}) // 基于 sku 进行 MERGE
    ON CREATE SET // 如果是新产品
        p.name = product_data.name,
        p.price = product_data.price,
        p.category = product_data.category,
        p.first_imported_at = datetime()
    ON MATCH SET // 如果产品已存在
        // 只更新提供了的属性
        p.price = COALESCE(product_data.price, p.price), 
        p.name = COALESCE(product_data.name, p.name),
        p.last_updated_at = datetime()
"""

# 执行批量更新
graph.run(batch_upsert_query, feed=product_feed)
print("
批量 Upsert 操作已执行。")

# --- 验证结果 ---
prod1 = graph.nodes.match("Product", sku="PROD-001").first()
prod2 = graph.nodes.match("Product", sku="PROD-002").first()

print("
--- 验证 PROD-001 (已存在的产品) ---")
print(f"  SKU: {
              prod1['sku']}")
print(f"  名称: {
              prod1['name']}") // 名称在 feed 中未提供,应保持不变
print(f"  价格: {
              prod1['price']}") // 价格应被更新为 1249.99
print(f"  首次导入时间: {
              prod1.get('first_imported_at')}") // 应该为 None,因为它不是新创建的
print(f"  最后更新时间: {
              prod1.get('last_updated_at')}") // 应该有一个新的时间戳

print("
--- 验证 PROD-002 (新创建的产品) ---")
print(f"  SKU: {
              prod2['sku']}")
print(f"  名称: {
              prod2['name']}")
print(f"  价格: {
              prod2['price']}")
print(f"  首次导入时间: {
              prod2.get('first_imported_at')}") // 应该有一个时间戳
print(f"  最后更新时间: {
              prod2.get('last_updated_at')}") // 应该为 None,因为它没有被 MATCH 到

这个 UNWIND + MERGE + ON CREATE / ON MATCH 的组合模式功能极其强大,是构建健壮数据集成管道的基石。它能够以极高的性能和完美的幂等性处理来自外部系统的、混杂着新数据和更新数据的复杂数据流。


3.4 查询剖析与优化:EXPLAINPROFILE 的威力

编写出能工作的 Cypher 查询只是第一步。在面对大规模数据集时,一个未经优化的查询可能会运行数分钟甚至数小时,或者直接耗尽服务器内存。EXPLAINPROFILE 是 Neo4j 提供的两个最重要的内省工具,它们能揭示查询的执行计划,让我们像外科医生一样精确地定位并移除性能瓶颈。

3.4.1 EXPLAIN:在执行前预演

EXPLAIN 关键字放在 Cypher 查询的开头,它会返回查询的预估执行计划(Estimated Execution Plan),但不会实际执行查询。这使得它非常安全和快速,是检查一个查询是否会使用索引、是否有明显性能问题的首选工具。

如何使用 EXPLAIN 并解读其输出:

from py2neo import Graph
import json # 使用 json 来美化打印复杂的嵌套字典

graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))

# --- 案例:一个没有索引的查询 ---
query_no_index = "EXPLAIN MATCH (p:Person {name: 'Tom Hanks'}) RETURN p"
print("--- 查询计划 (没有索引) ---")
plan_no_index = graph.run(query_no_index).data()
# EXPLAIN 的输出是一个复杂的嵌套结构,我们把它打印出来看看
print(json.dumps(plan_no_index, indent=2))

# --- 创建索引 ---
try:
    graph.schema.create_index("Person", "name")
    print("
为 (Person.name) 创建了 B-Tree 索引。
")
except Exception:
    pass # 索引可能已存在

# --- 再次查看查询计划 ---
query_with_index = "EXPLAIN MATCH (p:Person {name: 'Tom Hanks'}) RETURN p"
print("--- 查询计划 (有索引) ---")
plan_with_index = graph.run(query_with_index).data()
print(json.dumps(plan_with_index, indent=2))

解读查询计划的关键操作符(Operators):

AllNodesScan: 性能灾难。这意味着查询正在扫描数据库中的所有节点来寻找匹配项。这通常发生在你忘记给 MATCH 中的节点指定标签时,例如 MATCH (n {name: 'Tom'})
NodeByLabelScan: 性能瓶颈。这意味着查询正在扫描所有带有指定标签的节点。在上面的 “没有索引” 的例子中,你会看到这个操作符。如果 Person 节点有几百万个,这个查询就会非常慢。
NodeIndexSeek: 性能优秀。这意味着查询正在使用索引来直接定位和查找节点,其复杂度是对数级(O(log N))甚至接近常数级(O(1)),而不是线性级(O(N))。在上面的 “有索引” 的例子中,你会看到这个操作符。这是我们追求的目标。
NodeUniqueIndexSeek: 性能最佳。如果属性上有唯一性约束,查询会使用这个更高效的索引查找。
Expand(Into) / Expand(All): 这是图遍历的核心操作,代表从一个节点出发,通过关系“扩展”到其邻居。
Filter: 应用 WHERE 子句中的过滤条件。这个操作符的位置很关键。如果它出现在查询计划的顶层(执行后期),意味着数据库已经做了很多无用功,加载了大量数据之后才进行过滤。优化的目标之一就是将 Filter 操作尽可能地推向底层(执行前期)。
ProduceResults: 查询计划的终点,将结果返回给客户端。

通过对比 EXPLAIN 的前后输出,我们可以明确地知道创建索引是否生效,以及查询的起点是否高效。

3.4.2 PROFILE:在执行后复盘

PROFILEEXPLAIN 更进一步。它会实际执行查询,然后返回带有真实性能指标的执行计划。这些指标包括每个操作符处理的行数(rows)和它访问数据库存储文件的次数(db hits)。PROFILE 是精细化性能调优的终极武器。

db hits:最重要的优化指标
一个 “DB Hit” 代表了一次对底层存储文件(neostore.*.db.*)的访问,可以粗略地理解为一次 I/O 操作。一个查询的性能好坏,很大程度上取决于它产生了多少次 DB Hits。优化的核心目标就是用尽可能少的 DB Hits 来获得查询结果。

案例分析:优化一个复杂的查询

假设我们要查找“已关注了至少 10 个被超过 100 人已关注的大 V 的用户”。

from py2neo import Graph
import json

graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))
# 为了演示,创建一个小型的社交网络
graph.run("MATCH (n) DETACH DELETE n")
graph.run("""
    // 创建 10 个 "大V" (influencers)
    UNWIND range(1, 10) AS i
    CREATE (influencer:User {name: 'Influencer_' + i, type: 'influencer'})

    // 创建 1000 个普通用户
    UNWIND range(1, 1000) AS i
    CREATE (user:User {name: 'User_' + i, type: 'user'})
""")
graph.run("""
    // 创建已关注关系
    // 每个大V被 101 个用户已关注
    MATCH (i:User {type: 'influencer'})
    MATCH (u:User {type: 'user'}) WHERE rand() < 0.1 // 随机让 10% 的用户已关注每个大V
    CREATE (u)-[:FOLLOWS]->(i)

    // 让一些用户已关注多个大V
    MATCH (u:User) WHERE rand() < 0.05 // 随机选 5% 的用户
    MATCH (i:User {type: 'influencer'}) WHERE rand() < 0.3 // 让他们随机已关注 30% 的大V
    MERGE (u)-[:FOLLOWS]->(i)
""")
print("演示数据创建完毕。")

# --- 一个直观但性能可能不佳的查询 ---
# 这个查询的逻辑是:找到所有用户u,然后计算他们已关注的人中,满足“被已关注数>100”的人数,最后筛选
inefficient_query = """
PROFILE
MATCH (u:User)-[:FOLLOWS]->(followed:User) // 第一步:找到所有已关注关系
WITH u, followed
MATCH (follower:User)-[:FOLLOWS]->(followed) // 第二步:计算被已关注者的粉丝数
WITH u, followed, count(follower) AS followers_count
WHERE followers_count > 100
WITH u, count(followed) AS influential_followed_count
WHERE influential_followed_count >= 10
RETURN u.name
"""
print("
--- 低效查询的 PROFILE ---")
# 这个查询会产生巨大的中间结果(Cartesian Product),性能会很差
profile1 = graph.run(inefficient_query).data()
print(json.dumps(profile1, indent=2))
# 观察 profile1 的输出,你会看到巨大的 `rows` 和 `db hits`

# --- 一个优化后的查询 ---
# 优化的思路是:先计算出所有大V,将这个小集合固定下来,然后再去查找已关注了他们的用户
efficient_query = """
PROFILE
// 第一步:先找出所有粉丝数 > 100 的大V,并收集成一个列表
// 这一步的计算量是可控的
MATCH (follower:User)-[:FOLLOWS]->(influencer:User)
WITH influencer, count(follower) AS followers_count
WHERE followers_count > 100
WITH COLLECT(influencer) AS influencers // 使用 COLLECT 将结果聚合成一个列表

// 第二步:展开这个大V列表
UNWIND influencers AS i
// 第三步:找到已关注这些大V的用户
MATCH (u:User)-[:FOLLOWS]->(i)
// 第四步:按用户分组,并计算他们已关注的大V数量
WITH u, count(i) AS influential_followed_count
WHERE influential_followed_count >= 10
RETURN u.name
"""
print("
--- 高效查询的 PROFILE ---")
profile2 = graph.run(efficient_query).data()
print(json.dumps(profile2, indent=2))
# 观察 profile2 的输出,你会发现 `rows` 和 `db hits` 都显著降低了

优化前后的对比分析:

低效查询的问题: MATCH (u:User)-[:FOLLOWS]->(followed:User) 这一步会匹配出图中所有的已关注关系,产生巨大的行数。接着的 WITH u, followed 会将这个巨大的结果集传递给下一步。这被称为“基数爆炸”(Cardinality Explosion)。查询的大部分时间都花在处理这些海量的中间结果上。
高效查询的优势: WITH COLLECT(influencer) AS influencers 是关键。它首先识别出数量相对较少的大V集合(在这个例子中只有 10 个),然后将他们收集起来。COLLECT 是一个聚合函数,它会急剧地降低基数(Cardinality),将多行结果缩减为只有一行(这一行包含一个列表)。后续的查询都是基于这个小得多的 influencers 列表进行的,从而避免了处理海量的中间数据,db hitsrows 都大大减少。

使用 WITH 控制基数是 Cypher 性能优化最高级的技巧之一。它允许你有意识地在查询的中间点插入一个“屏障”,聚合和筛选数据,将处理的行数控制在可管理的范围内,然后再进行下一步的扩展和匹配。始终记住:尽早过滤,尽早聚合,让Cypher的每一步都工作在尽可能小的数据集上。

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容