在 Python 中分析数据流¶
您可以使用 CodeQL 来跟踪数据在 Python 程序中的流动情况,一直到数据被使用的地方。
关于本文档¶
本文档介绍了如何在适用于 Python 的 CodeQL 库中实现数据流分析,并包含一些示例,以帮助您编写自己的数据流查询。以下部分介绍了如何使用库进行本地数据流、全局数据流和污点跟踪。有关数据流建模的更一般性介绍,请参见“关于数据流分析。”
注意
此处描述的新数据流模块化 API 从 CodeQL 2.13.0 开始与 CodeQL 之前的库一起提供。有关库更改方式以及如何将任何现有查询迁移到模块化 API 的信息,请参见 CodeQL 查询编写的新数据流 API。
本地数据流¶
本地数据流是指单个方法或可调用函数中的数据流。与全局数据流相比,本地数据流更简单、更快、更精确,并且足以满足许多查询的需求。
使用本地数据流¶
本地数据流库位于模块 DataFlow
中,该模块定义了表示任何数据可以流经的元素的类 Node
。Node
类包含一些有用的子类,例如 ExprNode
(用于表达式)、CfgNode
(用于控制流节点)、CallCfgNode
(用于函数和方法调用)以及 ParameterNode
(用于参数)。您可以使用成员谓词 asExpr
和 asCfgNode
在数据流节点和表达式/控制流节点之间映射
class Node {
/** Gets the expression corresponding to this node, if any. */
Expr asExpr() { ... }
/** Gets the control-flow node corresponding to this node, if any. */
ControlFlowNode asCfgNode() { ... }
...
}
或者使用谓词 exprNode
/**
* Gets a node corresponding to expression `e`.
*/
ExprNode exprNode(Expr e) { ... }
由于控制流图被拆分,因此单个表达式可能与多个数据流节点相关联。
谓词 localFlowStep(Node nodeFrom, Node nodeTo)
成立,如果从节点 nodeFrom
到节点 nodeTo
存在直接数据流边。您可以通过使用 +
和 *
运算符递归地应用该谓词,或者使用预定义的递归谓词 localFlow
。
例如,您可以在零个或多个本地步骤中找到从表达式 source
到表达式 sink
的流
DataFlow::localFlow(DataFlow::exprNode(source), DataFlow::exprNode(sink))
使用本地污点跟踪¶
本地污点跟踪通过包括非值保留流步骤来扩展本地数据流。例如
temp = x
y = temp + ", " + temp
如果 x
是一个受污点的字符串,那么 y
也是受污点的。
本地污点跟踪库位于模块 TaintTracking
中。与本地数据流类似,谓词 localTaintStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo)
成立,如果从节点 nodeFrom
到节点 nodeTo
存在直接污点传播边。您可以通过使用 +
和 *
运算符递归地应用该谓词,或者使用预定义的递归谓词 localTaint
。
例如,您可以在零个或多个本地步骤中找到从表达式 source
到表达式 sink
的污点传播
TaintTracking::localTaint(DataFlow::exprNode(source), DataFlow::exprNode(sink))
使用本地源¶
当请求两个表达式之间的本地数据流或污点传播时,您通常会将表达式限制为与特定调查相关。下一节将提供一些具体的示例,但还有一个更抽象的概念,我们应该明确地提出来,那就是本地源。
本地源是一个数据流节点,没有本地数据流进入它。因此,它是一个本地数据流的起源,数据流从这里开始。这包括参数(它们只接收全局数据流)以及大多数表达式(因为它们不是值保留的)。将注意力限制在这些本地源上,可以得到一个更轻量级且更具性能的数据流图,并且在大多数情况下,它也是调查关注事项的更合适的抽象。类 LocalSourceNode
表示也属于本地源的数据流节点。它带有有用的成员谓词 flowsTo(DataFlow::Node node)
,如果从本地源到 node
存在本地数据流,则该谓词成立。
示例¶
Python 具有用于读取和写入文件的内置功能,例如函数 open
。但是,还存在库 os
,它提供了低级文件访问功能。此查询查找传递给 os.open
的文件名
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.ApiGraphs
from DataFlow::CallCfgNode call
where
call = API::moduleImport("os").getMember("open").getACall()
select call.getArg(0)
请注意,使用 API
模块来引用库函数。有关更多信息,请参见“在 Python 中使用 API 图。”
不幸的是,此查询只会给出参数中的表达式,而不是传递给它的值。因此,我们使用本地数据流来查找所有流入参数的表达式
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.ApiGraphs
from DataFlow::CallCfgNode call, DataFlow::ExprNode expr
where
call = API::moduleImport("os").getMember("open").getACall() and
DataFlow::localFlow(expr, call.getArg(0))
select call, expr
通常,您会看到一个表达式的几个数据流节点,因为它流向调用(请注意 call
列中重复出现的位置)。我们主要对这些节点中的“第一个”感兴趣,也就是文件名可能称为本地源。为了将注意力限制在这些本地源上,并同时提高分析的性能,我们有 QL 类 LocalSourceNode
。我们可以要求 expr
是这样的一个节点
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.ApiGraphs
from DataFlow::CallCfgNode call, DataFlow::ExprNode expr
where
call = API::moduleImport("os").getMember("open").getACall() and
DataFlow::localFlow(expr, call.getArg(0)) and
expr instanceof DataFlow::LocalSourceNode
select call, expr
但是,我们也可以通过强制转换来实现这一点。这将允许我们像这样在 LocalSourceNode
上使用成员函数 flowsTo
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.ApiGraphs
from DataFlow::CallCfgNode call, DataFlow::ExprNode expr
where
call = API::moduleImport("os").getMember("open").getACall() and
expr.(DataFlow::LocalSourceNode).flowsTo(call.getArg(0))
select call, expr
作为替代方案,我们可以更直接地通过谓词 getALocalSource
来要求 expr
是第一个参数的本地源
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.ApiGraphs
from DataFlow::CallCfgNode call, DataFlow::ExprNode expr
where
call = API::moduleImport("os").getMember("open").getACall() and
expr = call.getArg(0).getALocalSource()
select call, expr
这三个查询都给出相同的结果。现在,我们每个调用基本上只有一个表达式。
我们仍然有一些情况,多个表达式流向一个调用,但它们会通过不同的代码路径流动(可能是由于控制流拆分)。
我们可能想要使源更具体,例如函数或方法的参数。此查询查找使用参数作为名称打开文件的情况
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.ApiGraphs
from DataFlow::CallCfgNode call, DataFlow::ParameterNode p
where
call = API::moduleImport("os").getMember("open").getACall() and
DataFlow::localFlow(p, call.getArg(0))
select call, p
对于大多数代码库来说,这只会返回几个结果,并且可以手动检查这些结果。
使用通过参数提供的精确名称可能过于严格。如果我们想知道参数是否影响文件名,我们可以使用污点跟踪而不是数据流。此查询查找对 os.open
的调用,其中文件名是根据参数派生的
import python
import semmle.python.dataflow.new.TaintTracking
import semmle.python.ApiGraphs
from DataFlow::CallCfgNode call, DataFlow::ParameterNode p
where
call = API::moduleImport("os").getMember("open").getACall() and
TaintTracking::localTaint(p, call.getArg(0))
select call, p
通常,这会找到更多结果。
全局数据流¶
全局数据流跟踪整个程序中的数据流,因此比本地数据流更强大。但是,全局数据流不如本地数据流精确,并且分析通常需要更长的时间和更多的内存才能完成。
注意
您可以通过创建路径查询来在 CodeQL 中对数据流路径进行建模。要查看适用于 VS Code 的 CodeQL 生成的路径查询,您需要确保它具有正确元数据和
select
子句。有关更多信息,请参见 创建路径查询。
使用全局数据流¶
全局数据流库通过实现签名 DataFlow::ConfigSig
并应用模块 DataFlow::Global<ConfigSig>
来使用
import python
module MyFlowConfiguration implements DataFlow::ConfigSig {
predicate isSource(DataFlow::Node source) {
...
}
predicate isSink(DataFlow::Node sink) {
...
}
}
module MyFlow = DataFlow::Global<MyFlowConfiguration>;
这些谓词在配置中定义
isSource
- 定义数据可能从哪里流动。isSink
- 定义数据可能流向哪里。isBarrier
- 可选地,限制数据流。isAdditionalFlowStep
- 可选地,添加额外的流步骤。
数据流分析是使用谓词 flow(DataFlow::Node source, DataFlow::Node sink)
执行的
from DataFlow::Node source, DataFlow::Node sink
where MyFlow::flow(source, sink)
select source, "Dataflow to $@.", sink, sink.toString()
使用全局污点跟踪¶
全局污点跟踪与全局数据流的关系类似于本地污点跟踪与本地数据流的关系。也就是说,全局污点跟踪通过添加额外的非值保留步骤来扩展全局数据流。全局污点跟踪库通过将模块 TaintTracking::Global<ConfigSig>
应用到您的配置而不是 DataFlow::Global<ConfigSig>
来使用
import python
module MyFlowConfiguration implements DataFlow::ConfigSig {
predicate isSource(DataFlow::Node source) {
...
}
predicate isSink(DataFlow::Node sink) {
...
}
}
module MyFlow = TaintTracking::Global<MyFlowConfiguration>;
生成的模块与从 DataFlow::Global<ConfigSig>
获得的模块具有相同的签名。
预定义的源和接收器¶
数据流库包含许多预定义的源和接收器,为定义基于数据流的安全查询提供了良好的起点。
- 类
RemoteFlowSource
(在模块semmle.python.dataflow.new.RemoteFlowSources
中定义)表示来自远程网络输入的数据流。这对于在网络服务中查找安全问题非常有用。 - 库
Concepts
(在模块semmle.python.Concepts
中定义)包含DataFlow::Node
的几个子类,它们与安全相关,例如FileSystemAccess
和SqlExecution
。 - 模块
Attributes
(在模块semmle.python.dataflow.new.internal.Attributes
中定义)定义了AttrRead
和AttrWrite
,它们处理普通和动态属性访问。
对于全局流,将源限制为 LocalSourceNode
的实例也很有用。预定义的源通常会这样做。
类层次结构¶
DataFlow::Node
- 一个充当数据流节点的元素。DataFlow::CfgNode
- 一个充当数据流节点的控制流节点。DataFlow::ExprNode
- 一个充当数据流节点的表达式。DataFlow::ParameterNode
- 一个参数数据流节点,表示函数入口处参数的值。DataFlow::CallCfgNode
- 函数或方法调用的控制流节点,充当数据流节点。
RemoteFlowSource
- 来自网络/远程输入的数据流。Attributes::AttrRead
- 一个充当数据流节点的属性读取。Attributes::AttrWrite
- 一个充当数据流节点的属性写入。Concepts::SystemCommandExecution
- 一个执行操作系统命令的数据流节点,例如通过生成新进程。Concepts::FileSystemAccess
- 一个执行文件系统访问的数据流节点,包括读取和写入数据、创建和删除文件和文件夹、检查和更新权限等等。Concepts::Path::PathNormalization
- 一个执行路径规范化的数据流节点。这通常需要为了安全地访问路径。Concepts::Decoding
- 一个从二进制或文本格式解码数据的数据流节点。解码(自动)保留从输入到输出的污染。但是,它本身也可能成为一个问题,例如如果它允许代码执行或可能导致拒绝服务。Concepts::Encoding
- 一个将数据编码为二进制或文本格式的数据流节点。编码(自动)保留从输入到输出的污染。Concepts::CodeExecution
- 一个动态执行 Python 代码的数据流节点。Concepts::SqlExecution
- 一个执行 SQL 语句的数据流节点。Concepts::HTTP::Server::RouteSetup
- 一个在服务器上设置路由的数据流节点。Concepts::HTTP::Server::HttpResponse
- 一个在服务器上创建 HTTP 响应的数据流节点。
示例¶
此查询显示了一个数据流配置,该配置使用所有网络输入作为数据源
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import semmle.python.dataflow.new.RemoteFlowSources
import semmle.python.Concepts
module RemoteToFileConfiguration implements DataFlow::ConfigSig {
predicate isSource(DataFlow::Node source) {
source instanceof RemoteFlowSource
}
predicate isSink(DataFlow::Node sink) {
sink = any(FileSystemAccess fa).getAPathArgument()
}
}
module RemoteToFileFlow = TaintTracking::Global<RemoteToFileConfiguration>;
from DataFlow::Node input, DataFlow::Node fileAccess
where RemoteToFileFlow::flow(input, fileAccess)
select fileAccess, "This file access uses data from $@.",
input, "user-controllable input."
此数据流配置跟踪从环境变量到打开文件的 数据流
import python
import semmle.python.dataflow.new.TaintTracking
import semmle.python.ApiGraphs
module EnvironmentToFileConfiguration implements DataFlow::ConfigSig {
predicate isSource(DataFlow::Node source) {
source = API::moduleImport("os").getMember("getenv").getACall()
}
predicate isSink(DataFlow::Node sink) {
exists(DataFlow::CallCfgNode call |
call = API::moduleImport("os").getMember("open").getACall() and
sink = call.getArg(0)
)
}
}
module EnvironmentToFileFlow = DataFlow::Global<EnvironmentToFileConfiguration>;
from Expr environment, Expr fileOpen
where EnvironmentToFileFlow::flow(DataFlow::exprNode(environment), DataFlow::exprNode(fileOpen))
select fileOpen, "This call to 'os.open' uses data from $@.",
environment, "call to 'os.getenv'"
进一步阅读¶
- 使用路径查询探索数据流 在 GitHub 文档中。