列表分发连接
相信您在AI应用的实际开发场景中,也经常会遇到这样的情况:
- 让模型生成了一个执行任务清单,接下来需要根据执行任务清单的任务项顺序,将任务项逐个分发到任务执行工作流程去
- 使用搜索工具或是知识库查询取回了一个原始信息清单,需要将清单中的原始信息项目逐个分发到加工处理工作流程去
- 用户上传了一个巨大的文件,进行文本切块之后,需要将切块list中的每一个切块项分发到文本块处理工作流程去
当遇到这样的情况时,显然之前我们提到的所有连接关系都不是最佳处理方案。这时候,您可以使用 Agently Workflow 针对这种场景特别提供的.loop_with()
方法进行连接处理,这个方法将能够从返回列表(list)
、字典(dict)
或是整数(int)
类型结果的上游工作块将相关数据逐个分发到指向的承接处理任务的子工作流中。
Note
请注意,与其他连接关系不同,.loop_with()
的括号中需要指向的是一个承接单个项目处理任务的workflow对象,而不是一个工作块
列表分发连接的工作过程示意图
从上图可以看到,.loop_with()
方法将会拆解上游工作块提供的输入数据,并逐个分发到指向的workflow对象中,然后等待workflow对象处理完毕之后,再将处理结果重新组合为列表返回给下游工作块。
.loop_with()对不同类型输入数据的处理方式
列表(list)
列表(list)
是.loop_with()
最常见的输入数据类型,当上游工作块提供的输入数据为列表时,.loop_with()
方法将按顺序遍历列表中的每一个元素,并将每个元素依次分发到指向的workflow对象中。
import Agently
workflow = Agently.Workflow()
sub_workflow = Agently.Workflow()
@workflow.chunk()
def send_list(inputs, storage):
return [1, 2, 3]
@workflow.chunk()
def print_to_chunk_inputs(inputs, storage):
print("[to chunk inputs]: ", inputs)
return
@sub_workflow.chunk()
def print_sub_workflow_inputs(inputs, storage):
print("[sub workflow inputs]: ", inputs)
return inputs
(
sub_workflow
.connect_to("print_sub_workflow_inputs")
.connect_to("END")
)
(
workflow
.connect_to("send_list")
.loop_with(sub_workflow)
.connect_to("print_to_chunk_inputs")
.connect_to("END")
)
workflow.start()
[sub workflow inputs]: {'default': 1}
[sub workflow inputs]: {'default': 2}
[sub workflow inputs]: {'default': 3}
[to chunk inputs]: {'default': {'default': [{'default': 1}, {'default': 2}, {'default': 3}]}}
%%{ init: { 'flowchart': { 'curve': 'linear' }, 'theme': 'neutral' } }%%
%% Rendered By Agently %%
flowchart LR
classDef chunk_style fill:#fbfcdb,stroke:#666,stroke-width:1px,color:#333;
classDef condition_chunk_style fill:#ECECFF,stroke:#9370DB,stroke-width:1px,color:#333;
classDef loop_style fill:#f5f7fa,stroke:#666,stroke-width:1px,color:#333,stroke-dasharray: 5 5
subgraph Loop_1
direction LR
773963d1-0cd9-4d6d-bf8a-9d6ce3d9ef56("START"):::chunk_style -.-> |"* -->-- default"| 087a6555-353c-41c1-bc79-3087fd36d683("print_sub_workflow_inputs"):::chunk_style
087a6555-353c-41c1-bc79-3087fd36d683("print_sub_workflow_inputs"):::chunk_style -.-> |"* -->-- default"| b8e2117b-3edb-4f65-a31c-f2aef4420375("END"):::chunk_style
end
813947f8-b2ea-4a14-8002-105d1851904c("START"):::chunk_style -.-> |"* -->-- default"| d472d70f-6311-4e23-8d2e-81a01a9179cc("send_list"):::chunk_style
d472d70f-6311-4e23-8d2e-81a01a9179cc("send_list"):::chunk_style -.-> |"* -->-- default"| Loop_1:::loop_style
Loop_1:::loop_style -.-> |"* -->-- default"| 2f621910-6478-4e30-a1f3-c41758e53662("print_to_chunk_inputs"):::chunk_style
2f621910-6478-4e30-a1f3-c41758e53662("print_to_chunk_inputs"):::chunk_style -.-> |"* -->-- default"| f896c515-19f0-4c2e-ba23-a310f406e11d("END"):::chunk_style
字典(dict)
当上游工作块提供的输入数据为字典时,.loop_with()
方法将按顺序遍历字典中所有的键值对,并以{ "key": "<键名>", "value": "<键值>" }
格式分发到指向的workflow对象中。
import Agently
workflow = Agently.Workflow()
sub_workflow = Agently.Workflow()
@workflow.chunk()
def send_dict(inputs, storage):
return { "a": 1, "b": "what?", "c": ["Agently", "Workflow"], "d": { "Hello": "World" } }
@workflow.chunk()
def print_to_chunk_inputs(inputs, storage):
print("[to chunk inputs]: ", inputs)
return
@sub_workflow.chunk()
def print_sub_workflow_inputs(inputs, storage):
print("[sub workflow inputs]: ", inputs)
return inputs
(
sub_workflow
.connect_to("print_sub_workflow_inputs")
.connect_to("END")
)
(
workflow
.connect_to("send_dict")
.loop_with(sub_workflow)
.connect_to("print_to_chunk_inputs")
.connect_to("END")
)
workflow.start()
[sub workflow inputs]: {'default': {'key': 'a', 'value': 1}}
[sub workflow inputs]: {'default': {'key': 'b', 'value': 'what?'}}
[sub workflow inputs]: {'default': {'key': 'c', 'value': ['Agently', 'Workflow']}}
[sub workflow inputs]: {'default': {'key': 'd', 'value': {'Hello': 'World'}}}
[to chunk inputs]: {'default': {'default': [{'default': {'key': 'a', 'value': 1}}, {'default': {'key': 'b', 'value': 'what?'}}, {'default': {'key': 'c', 'value': ['Agently', 'Workflow']}}, {'default': {'key': 'd', 'value': {'Hello': 'World'}}}]}}
整数(int)
当上游工作块提供的输入数据为整数时,.loop_with()
方法将按顺序遍历从0
到输入数据-1
的整数的列表,并将每个元素依次分发到指向的workflow对象中。
import Agently
workflow = Agently.Workflow()
sub_workflow = Agently.Workflow()
@workflow.chunk()
def send_int(inputs, storage):
return 3
@workflow.chunk()
def print_to_chunk_inputs(inputs, storage):
print("[to chunk inputs]: ", inputs)
return
@sub_workflow.chunk()
def print_sub_workflow_inputs(inputs, storage):
print("[sub workflow inputs]: ", inputs)
return inputs
(
sub_workflow
.connect_to("print_sub_workflow_inputs")
.connect_to("END")
)
(
workflow
.connect_to("send_int")
.loop_with(sub_workflow)
.connect_to("print_to_chunk_inputs")
.connect_to("END")
)
workflow.start()
[sub workflow inputs]: {'default': 0}
[sub workflow inputs]: {'default': 1}
[sub workflow inputs]: {'default': 2}
[to chunk inputs]: {'default': {'default': [{'default': 0}, {'default': 1}, {'default': 2}]}}
子工作流的输出格式
观察上面的代码案例,您可能会注意到,在使用.loop_with()
方法之后,输出数据的格式看起来有些奇怪,里面携带了很多default
键,为了让您能够更好理解这里面的层次结构,我们以最常见的分发列表代码示例的输出结果为例,对输出结果的层次结构进行说明。
让我们再次观察一下分发列表代码实例的运行结果:
[sub workflow inputs]: {'default': 1}
[sub workflow inputs]: {'default': 2}
[sub workflow inputs]: {'default': 3}
[to chunk inputs]: {'default': {'default': [{'default': 1}, {'default': 2}, {'default': 3}]}}
[to chunk inputs]
的输出数据中,最内层的列表中,每个元素都是sub_workflow
的输出数据,它们与向子工作流输入的数据项数量和顺序都保持一致,所以我们可以将[to chunk inputs]
改写为以下形式:
那么剩下两个default
键,又是从哪里来的呢?实际上,第一个default
代表了print_to_chunk_inputs
工作块承接数据的输入端点,而第二个default
则代表了sub_workflow
的END
块接收数据并进行输出的接收端点
让我们再对样例代码做一下处理,就能清晰观察到这里的层次结构:
import Agently
workflow = Agently.Workflow()
sub_workflow = Agently.Workflow()
@workflow.chunk()
def send_list(inputs, storage):
return [1, 2, 3]
@workflow.chunk()
def print_to_chunk_inputs(inputs, storage):
print("[to chunk inputs]: ", inputs)
return
# 对子工作流中的数据进行解析和传递
@sub_workflow.chunk()
def print_sub_workflow_inputs(inputs, storage):
# 在这里对sub_workflow起点块对默认的"default"端点输入的数据进行解析
print("[sub workflow inputs]: ", inputs["default"])
# 向下游传递时,也只传递从"default"中取出的数据
# 其实这一段模拟的是我们在真实场景中,对输入数据的处理逻辑
# 通常我们都会基于输入数据,撰写更多的业务逻辑,然后将业务逻辑结果向下传递
# 而不是直接将输入数据向下传递
return inputs["default"]
(
sub_workflow
.connect_to("print_sub_workflow_inputs")
# 传递结果时,传递到END块的"return_value"端点
# 而不是默认的"default"端点
.connect_to("END.return_value")
)
(
workflow
.connect_to("send_list")
.loop_with(sub_workflow)
# 承接结果时,使用"print_to_chunk_inputs"的"input_handle"端点
# 而不是默认的"default"端点
.connect_to("print_to_chunk_inputs.input_handle")
.connect_to("END")
)
workflow.start()
[sub workflow inputs]: 1
[sub workflow inputs]: 2
[sub workflow inputs]: 3
[to chunk inputs]: {'input_handle': {'return_value': [1, 2, 3]}}
通过这样的改写,我们就可以清晰地看到整个输出数据中,不同层次的default
键所代表的含义。
同时,在通过workflow.draw()
输出的工作流图里,也对不同的端点信息进行了标注:
%%{ init: { 'flowchart': { 'curve': 'linear' }, 'theme': 'neutral' } }%%
%% Rendered By Agently %%
flowchart LR
classDef chunk_style fill:#fbfcdb,stroke:#666,stroke-width:1px,color:#333;
classDef condition_chunk_style fill:#ECECFF,stroke:#9370DB,stroke-width:1px,color:#333;
classDef loop_style fill:#f5f7fa,stroke:#666,stroke-width:1px,color:#333,stroke-dasharray: 5 5
subgraph Loop_1
direction LR
b0c1ed4a-eb8d-4ff0-9ea9-265da807bcf0("START"):::chunk_style -.-> |"* -->-- default"| d60f2276-a5df-417c-a87c-a9774a0e9441("print_sub_workflow_inputs"):::chunk_style
d60f2276-a5df-417c-a87c-a9774a0e9441("print_sub_workflow_inputs"):::chunk_style -.-> |"* -->-- return_value"| 99661986-ce72-4bb7-bb39-4e39893fe9ad("END"):::chunk_style
end
f52d6690-45c9-4c28-9747-f0d063405894("START"):::chunk_style -.-> |"* -->-- default"| af24e769-b0d0-4483-90cb-75c80bad7496("send_list"):::chunk_style
af24e769-b0d0-4483-90cb-75c80bad7496("send_list"):::chunk_style -.-> |"* -->-- default"| Loop_1:::loop_style
Loop_1:::loop_style -.-> |"* -->-- input_handle"| 0c6796ba-31f8-4fc4-8160-f53697816d3c("print_to_chunk_inputs"):::chunk_style
0c6796ba-31f8-4fc4-8160-f53697816d3c("print_to_chunk_inputs"):::chunk_style -.-> |"* -->-- default"| 78d909c2-905c-40cd-a572-4ab448144fa4("END"):::chunk_style
在.loop_with()中使用函数而不是子工作流进行处理
.loop_with()
也可以接受函数作为参数,使用这种方式,我们可以快速在工作流中执行一些需要根据上游块输出数据进行的循环处理,例如:
import Agently
workflow = Agently.Workflow()
sub_workflow = Agently.Workflow()
@workflow.chunk()
def send_list(inputs, storage):
return [1, "a", { "b": "Agently" }]
@workflow.chunk()
def print_inputs(inputs, storage):
print(".loop_with() Result:", inputs["default"])
return inputs["default"]
def loop_with_executor(item, storage):
return { "value": item }
(
workflow
.connect_to("send_list")
# 您也可以尝试使用lambda函数进一步简化代码
.loop_with(loop_with_executor)
.connect_to("print_inputs")
.connect_to("END")
)
workflow.start()
print(workflow.draw())
%%{ init: { 'flowchart': { 'curve': 'linear' }, 'theme': 'neutral' } }%%
%% Rendered By Agently %%
flowchart LR
classDef chunk_style fill:#fbfcdb,stroke:#666,stroke-width:1px,color:#333;
classDef condition_chunk_style fill:#ECECFF,stroke:#9370DB,stroke-width:1px,color:#333;
classDef loop_style fill:#f5f7fa,stroke:#666,stroke-width:1px,color:#333,stroke-dasharray: 5 5
subgraph Loop_1
direction LR
f5af3987-7f14-4b85-bad9-11e22df1b69e(loop_with_executor):::chunk_style
end
52216f0e-0d36-4bd9-adda-6647725bf4d8("START"):::chunk_style -.-> |"* -->-- default"| 988825fb-84f1-412d-a97c-50612a61e508("send_list"):::chunk_style
988825fb-84f1-412d-a97c-50612a61e508("send_list"):::chunk_style -.-> |"* -->-- default"| Loop_1:::loop_style
Loop_1:::loop_style -.-> |"* -->-- default"| 80a31daa-e5d4-4678-bcdb-28c536ba7528("print_inputs"):::chunk_style
80a31daa-e5d4-4678-bcdb-28c536ba7528("print_inputs"):::chunk_style -.-> |"* -->-- default"| 7e73fedd-cbb7-42e0-8272-8c066693512e("END"):::chunk_style