Skip to content

Commit

Permalink
Workflow deep and workflow connection performance (#2664)
Browse files Browse the repository at this point in the history
* feat: Workflow dispatch deep

* perf: workflow connection
  • Loading branch information
c121914yu committed Sep 10, 2024
1 parent 7473be5 commit aeba792
Show file tree
Hide file tree
Showing 21 changed files with 301 additions and 1,327 deletions.
3 changes: 3 additions & 0 deletions docSite/content/zh-cn/docs/development/upgrading/4811.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ weight: 813
5. 新增 - 插件支持配置使用引导、全局变量和文件输入。
6. 新增 - 简易模式支持新的版本管理方式。
7. 新增 - 聊天记录滚动加载,不再只加载 30 条。
8. 优化 - 工作流嵌套层级限制 20 层,避免因编排不合理导致的无限死循环。
9. 优化 - 工作流 handler 性能优化。
10. 修复 - 知识库选择权限问题。
3 changes: 2 additions & 1 deletion packages/global/core/workflow/runtime/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ export enum DispatchNodeResponseKeyEnum {
assistantResponses = 'assistantResponses', // assistant response
rewriteHistories = 'rewriteHistories', // If have the response, workflow histories will be rewrite

interactive = 'INTERACTIVE' // is interactive
interactive = 'INTERACTIVE', // is interactive
runTimes = 'runTimes' // run times
}

export const needReplaceReferenceInputTypeList = [
Expand Down
2 changes: 2 additions & 0 deletions packages/global/core/workflow/runtime/type.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export type ChatDispatchProps = {
maxRunTimes: number;
isToolCall?: boolean;
workflowStreamResponse?: WorkflowResponseType;
workflowDispatchDeep?: number;
};

export type ModuleDispatchProps<T> = ChatDispatchProps & {
Expand Down Expand Up @@ -181,6 +182,7 @@ export type DispatchNodeResultType<T = {}> = {
[DispatchNodeResponseKeyEnum.toolResponses]?: ToolRunResponseItemType; // Tool response
[DispatchNodeResponseKeyEnum.assistantResponses]?: AIChatItemValueItemType[]; // Assistant response(Store to db)
[DispatchNodeResponseKeyEnum.rewriteHistories]?: ChatItemType[];
[DispatchNodeResponseKeyEnum.runTimes]?: number;
} & T;

/* Single node props */
Expand Down
3 changes: 3 additions & 0 deletions packages/service/core/workflow/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export const WORKFLOW_MAX_RUN_TIMES = process.env.WORKFLOW_MAX_RUN_TIMES
? parseInt(process.env.WORKFLOW_MAX_RUN_TIMES)
: 500;
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,10 @@ export const runToolWithFunctionCall = async (
dispatchFlowResponse,
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
completeMessages: filterMessages,
assistantResponses: toolNodeAssistants
assistantResponses: toolNodeAssistants,
runTimes:
(response?.runTimes || 0) +
flatToolsResponseData.reduce((sum, item) => sum + item.runTimes, 0)
};
}

Expand All @@ -310,7 +313,10 @@ export const runToolWithFunctionCall = async (
{
dispatchFlowResponse,
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
assistantResponses: toolNodeAssistants
assistantResponses: toolNodeAssistants,
runTimes:
(response?.runTimes || 0) +
flatToolsResponseData.reduce((sum, item) => sum + item.runTimes, 0)
}
);
} else {
Expand All @@ -330,7 +336,8 @@ export const runToolWithFunctionCall = async (
dispatchFlowResponse: response?.dispatchFlowResponse || [],
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
completeMessages,
assistantResponses: [...assistantResponses, ...toolNodeAssistant.value]
assistantResponses: [...assistantResponses, ...toolNodeAssistant.value],
runTimes: (response?.runTimes || 0) + 1
};
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ export const dispatchRunTools = async (props: DispatchToolModuleProps): Promise<
dispatchFlowResponse, // tool flow response
totalTokens,
completeMessages = [], // The actual message sent to AI(just save text)
assistantResponses = [] // FastGPT system store assistant.value response
assistantResponses = [], // FastGPT system store assistant.value response
runTimes
} = await (async () => {
const adaptMessages = chats2GPTMessages({ messages, reserveId: false });

Expand Down Expand Up @@ -195,6 +196,7 @@ export const dispatchRunTools = async (props: DispatchToolModuleProps): Promise<
const previewAssistantResponses = filterToolResponseToPreview(assistantResponses);

return {
[DispatchNodeResponseKeyEnum.runTimes]: runTimes,
[NodeOutputKeyEnum.answerText]: previewAssistantResponses
.filter((item) => item.text?.content)
.map((item) => item.text?.content || '')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ export const runToolWithPromptCall = async (
dispatchFlowResponse: response?.dispatchFlowResponse || [],
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
completeMessages,
assistantResponses: [...assistantResponses, ...toolNodeAssistant.value]
assistantResponses: [...assistantResponses, ...toolNodeAssistant.value],
runTimes: (response?.runTimes || 0) + 1
};
}

Expand Down Expand Up @@ -318,7 +319,8 @@ ANSWER: `;
dispatchFlowResponse,
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
completeMessages: filterMessages,
assistantResponses: toolNodeAssistants
assistantResponses: toolNodeAssistants,
runTimes: (response?.runTimes || 0) + toolsRunResponse.moduleRunResponse.runTimes
};
}

Expand All @@ -330,7 +332,8 @@ ANSWER: `;
{
dispatchFlowResponse,
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
assistantResponses: toolNodeAssistants
assistantResponses: toolNodeAssistants,
runTimes: (response?.runTimes || 0) + toolsRunResponse.moduleRunResponse.runTimes
}
);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,10 @@ export const runToolWithToolChoice = async (
dispatchFlowResponse,
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
completeMessages,
assistantResponses: toolNodeAssistants
assistantResponses: toolNodeAssistants,
runTimes:
(response?.runTimes || 0) +
flatToolsResponseData.reduce((sum, item) => sum + item.runTimes, 0)
};
}

Expand All @@ -338,7 +341,10 @@ export const runToolWithToolChoice = async (
{
dispatchFlowResponse,
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
assistantResponses: toolNodeAssistants
assistantResponses: toolNodeAssistants,
runTimes:
(response?.runTimes || 0) +
flatToolsResponseData.reduce((sum, item) => sum + item.runTimes, 0)
}
);
} else {
Expand All @@ -358,7 +364,8 @@ export const runToolWithToolChoice = async (
dispatchFlowResponse: response?.dispatchFlowResponse || [],
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
completeMessages,
assistantResponses: [...assistantResponses, ...toolNodeAssistant.value]
assistantResponses: [...assistantResponses, ...toolNodeAssistant.value],
runTimes: (response?.runTimes || 0) + 1
};
}
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type { RuntimeNodeItemType } from '@fastgpt/global/core/workflow/runtime/
import { ChatNodeUsageType } from '@fastgpt/global/support/wallet/bill/type';
import type { DispatchFlowResponse } from '../../type.d';
import { AIChatItemValueItemType, ChatItemValueItemType } from '@fastgpt/global/core/chat/type';
import { DispatchNodeResponseKeyEnum } from '@fastgpt/global/core/workflow/runtime/constants';

export type DispatchToolModuleProps = ModuleDispatchProps<{
[NodeInputKeyEnum.history]?: ChatItemType[];
Expand All @@ -25,6 +26,7 @@ export type RunToolResponse = {
totalTokens: number;
completeMessages?: ChatCompletionMessageParam[];
assistantResponses?: AIChatItemValueItemType[];
[DispatchNodeResponseKeyEnum.runTimes]: number;
};
export type ToolNodeItemType = RuntimeNodeItemType & {
toolParams: RuntimeNodeItemType['inputs'];
Expand Down
34 changes: 32 additions & 2 deletions packages/service/core/workflow/dispatch/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,31 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
...props
} = data;

// 初始化深度和自动增加深度,避免无限嵌套
if (!props.workflowDispatchDeep) {
props.workflowDispatchDeep = 1;
} else {
props.workflowDispatchDeep += 1;
}

if (props.workflowDispatchDeep > 20) {
return {
flowResponses: [],
flowUsages: [],
debugResponse: {
finishedNodes: [],
finishedEdges: [],
nextStepRunNodes: []
},
[DispatchNodeResponseKeyEnum.runTimes]: 1,
[DispatchNodeResponseKeyEnum.assistantResponses]: [],
[DispatchNodeResponseKeyEnum.toolResponses]: null,
newVariables: removeSystemVariable(variables)
};
}

let workflowRunTimes = 0;

// set sse response headers
if (stream && res) {
res.setHeader('Content-Type', 'text/event-stream;charset=utf-8');
Expand Down Expand Up @@ -154,7 +179,8 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
nodeDispatchUsages,
toolResponses,
assistantResponses,
rewriteHistories
rewriteHistories,
runTimes = 1
}: Omit<
DispatchNodeResultType<{
[NodeOutputKeyEnum.answerText]?: string;
Expand All @@ -163,6 +189,10 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
'nodeResponse'
>
) {
// Add run times
workflowRunTimes += runTimes;
props.maxRunTimes -= runTimes;

if (responseData) {
chatResponses.push(responseData);
}
Expand Down Expand Up @@ -330,7 +360,6 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
const nodeRunResult = await (() => {
if (status === 'run') {
nodeRunBeforeHook(node);
props.maxRunTimes--;
addLog.debug(`[dispatchWorkFlow] nodeRunWithActive: ${node.name}`);
return nodeRunWithActive(node);
}
Expand Down Expand Up @@ -565,6 +594,7 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
finishedEdges: runtimeEdges,
nextStepRunNodes: debugNextStepRunNodes
},
[DispatchNodeResponseKeyEnum.runTimes]: workflowRunTimes,
[DispatchNodeResponseKeyEnum.assistantResponses]:
mergeAssistantResponseAnswerText(chatAssistantResponse),
[DispatchNodeResponseKeyEnum.toolResponses]: toolRunResponse,
Expand Down
3 changes: 2 additions & 1 deletion packages/service/core/workflow/dispatch/plugin/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export const dispatchRunPlugin = async (props: RunPluginProps): Promise<RunPlugi
appId: String(plugin.id)
};

const { flowResponses, flowUsages, assistantResponses } = await dispatchWorkFlow({
const { flowResponses, flowUsages, assistantResponses, runTimes } = await dispatchWorkFlow({
...props,
runningAppInfo: {
id: String(plugin.id),
Expand All @@ -92,6 +92,7 @@ export const dispatchRunPlugin = async (props: RunPluginProps): Promise<RunPlugi
return {
assistantResponses,
// responseData, // debug
[DispatchNodeResponseKeyEnum.runTimes]: runTimes,
[DispatchNodeResponseKeyEnum.nodeResponse]: {
moduleLogo: plugin.avatar,
totalPoints: usagePoints,
Expand Down
3 changes: 2 additions & 1 deletion packages/service/core/workflow/dispatch/plugin/runApp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export const dispatchRunAppNode = async (props: Props): Promise<Response> => {
appId: String(appData._id)
};

const { flowResponses, flowUsages, assistantResponses } = await dispatchWorkFlow({
const { flowResponses, flowUsages, assistantResponses, runTimes } = await dispatchWorkFlow({
...props,
runningAppInfo: {
id: String(appData._id),
Expand Down Expand Up @@ -107,6 +107,7 @@ export const dispatchRunAppNode = async (props: Props): Promise<Response> => {
const { text } = chatValue2RuntimePrompt(assistantResponses);

return {
[DispatchNodeResponseKeyEnum.runTimes]: runTimes,
[DispatchNodeResponseKeyEnum.nodeResponse]: {
moduleLogo: appData.avatar,
query: userChatInput,
Expand Down
1 change: 1 addition & 0 deletions packages/service/core/workflow/dispatch/type.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export type DispatchFlowResponse = {
};
[DispatchNodeResponseKeyEnum.toolResponses]: ToolRunResponseItemType;
[DispatchNodeResponseKeyEnum.assistantResponses]: AIChatItemValueItemType[];
[DispatchNodeResponseKeyEnum.runTimes]: number;
newVariables: Record<string, string>;
};

Expand Down
Loading

0 comments on commit aeba792

Please sign in to comment.