Eino
ChatModel 组件
ChatModel 是 Eino 框架中对对话大模型的抽象,它提供了统一的接口来与不同的大模型服务(如 OpenAI、Ollama 等)进行交互。
Model 组件是一个用于与大语言模型交互的组件。它的主要作用是将用户的输入消息发送给语言模型,并获取模型的响应。适合用于以下场景:
- 自然语言对话
- 文本生成
- 生成多模态交互(文本、图片等)
- 工具调用的参数
接口定义与三种方法
type BaseChatModel interface {
Generate(ctx context.Context, input []*schema.Message, opts ...Option) (*schema.Message, error)
Stream(ctx context.Context, input []*schema.Message, opts ...Option) (
*schema.StreamReader[*schema.Message], error)
}
type ToolCallingChatModel interface {
BaseChatModel
// WithTools returns a new ToolCallingChatModel instance with the specified tools bound.
// This method does not modify the current instance, making it safer for concurrent use.
WithTools(tools []*schema.ToolInfo) (ToolCallingChatModel, error)
}Generate 方法
- 功能:生成完整的模型响应
- 参数:
ctx:上下文对象,用于传递请求级别的信息,同时也用于传递 Callback Managerinput:输入消息列表opts:可选参数,用于配置模型行为
- 返回值:
*schema.Message:模型生成的响应消息error:生成过程中的错误信息
Stream 方法
- 功能:以流式方式生成模型响应
- 参数:与
Generate方法相同 - 返回值:
*schema.StreamReader[*schema.Message]:模型响应的流式读取器error:生成过程中的错误信息
WithTools 方法
- 功能:为模型绑定可用的工具
- 参数:
tools:工具信息列表
- 返回值:
ToolCallingChatModel: 绑定了 tools 后的 chatmodelerror:绑定过程中的错误信息
schema.Message 结构体
type Message struct {
// Role 表示消息的角色(system/user/assistant/tool)
Role RoleType `json:"role"`
// // Content 是消息的文本内容
Content string `json:"content"`
// if MultiContent is not empty, use this instead of Content
// if MultiContent is empty, use Content
// MultiContent 是多模态内容,支持文本、图片、音频等
// Deprecated: 已废弃,使用 UserInputMultiContent 替代
MultiContent []ChatMessagePart `json:"multi_content,omitempty"`
// UserInputMultiContent 用来存储用户输入的多模态数据,支持文本、图片、音频、视频、文件
// 使用此字段时限制模型角色为 User
UserInputMultiContent []MessageInputPart `json:"user_input_multi_content,omitempty"`
// AAssistantGenMultiContent 用来承接模型输出的多模态数据,支持文本、图片、音频、视频
// 使用此字段时限制模型角色为 Assistant
AssistantGenMultiContent []MessageOutputPart `json:"assistant_output_multi_content,omitempty"`
// Name 是消息的发送者名称
Name string `json:"name,omitempty"`
// ToolCalls 是 assistant 消息中的工具调用信息
ToolCalls []ToolCall `json:"tool_calls,omitempty"`
// ToolCallID 是 tool 消息的工具调用 ID
ToolCallID string `json:"tool_call_id,omitempty"`
// only for ToolMessage
ToolName string `json:"tool_name,omitempty"`
// ResponseMeta 包含响应的元信息
ResponseMeta *ResponseMeta `json:"response_meta,omitempty"`
// ReasoningContent 是模型的推理过程,当模型返回推理内容时会包含该部分
ReasoningContent string `json:"reasoning_content,omitempty"`
// Extra 用于存储额外信息
Extra map[string]any `json:"extra,omitempty"`
}Generate 完整生成
func main() {
err := godotenv.Load(".env")
if err != nil {
panic(err)
}
ctx := context.Background()
chatModel, err := ark.NewChatModel(ctx, &ark.ChatModelConfig{
APIKey: os.Getenv("ARK_API_KEY"),
Model: os.Getenv("MODEL"),
})
if err != nil {
panic(err)
}
input := []*schema.Message{
schema.SystemMessage("你是一个可爱的高中美少女"),
schema.UserMessage("你好"),
}
response, err := chatModel.Generate(ctx, input)
if err != nil {
panic(err)
}
fmt.Println(response.Content)
}Stream 流式生成
func main() {
err := godotenv.Load(".env")
if err != nil {
panic(err)
}
ctx := context.Background()
chatModel, err := ark.NewChatModel(ctx, &ark.ChatModelConfig{
APIKey: os.Getenv("ARK_API_KEY"),
Model: os.Getenv("MODEL"),
})
if err != nil {
panic(err)
}
input := []*schema.Message{
schema.SystemMessage("你是一个可爱的高中美少女"),
schema.UserMessage("你好"),
}
streamReader, err := chatModel.Stream(ctx, input)
if err != nil {
panic(err)
}
defer streamReader.Close()
for {
chunk, err := streamReader.Recv()
if err == io.EOF {
break
}
if err != nil {
panic(err)
}
fmt.Println(chunk.Content)
}
}ChatTemplate 组件
接口定义
type ChatTemplate interface {
Format(ctx context.Context, vs map[string]any, opts ...Option) ([]*schema.Message, error)
}prompt.FromMessages()- 用于把多个 message 变成一个 chat template。
schema.Message{}schema.Message是实现了Format接口的结构体,因此可直接构建schema.Message{}作为 template
schema.SystemMessage()- 此方法是构建 role 为“system”的 message 快捷方法
schema.AssistantMessage()- 此方法是构建 role 为“assistant”的 message 快捷方法
schema.UserMessage()- 此方法是构建 role 为“user”的 message 快捷方法
schema.ToolMessage()- 此方法是构建 role 为“tool”的 message 快捷方法
schema.MessagesPlaceholder()- 可用于把一个
[]*schema.Message插入到 message 列表中,常用于插入历史对话
- 可用于把一个
Format 方法
- 功能:将变量值填充到消息模板中
- 参数:
ctx:上下文对象,用于传递请求级别的信息,同时也用于传递 Callback Managervs:变量值映射,用于填充模板中的占位符opts:可选参数,用于配置格式化行为
- 返回值:
[]*schema.Message:格式化后的消息列表error:格式化过程中的错误信息
基本使用
- 使用
prompt.FromMessages以及提供的schema.FString格式化 - 构造
params参数 - 使用
template自带的Format构建即可
示例代码
package main
import (
"context"
"fmt"
"os"
"github.com/cloudwego/eino-ext/components/model/ark"
"github.com/cloudwego/eino/components/prompt"
"github.com/cloudwego/eino/schema"
"github.com/joho/godotenv"
)
func main() {
err := godotenv.Load(".env")
if err != nil {
panic(err)
}
ctx := context.Background()
chatModel, err := ark.NewChatModel(ctx, &ark.ChatModelConfig{
APIKey: os.Getenv("ARK_API_KEY"),
Model: os.Getenv("MODEL"),
})
if err != nil {
panic(err)
}
template := prompt.FromMessages(
schema.FString,
schema.SystemMessage("你是一个{role}"),
&schema.Message{
Role: schema.User,
Content: "请帮帮我,史瓦罗先生,帮我解决{task}",
},
)
params := map[string]any{
"role": "机器人史瓦罗先生",
"task": "帮我刷星琼",
}
msg, err := template.Format(ctx, params)
if err != nil {
panic(err)
}
response, err := chatModel.Generate(ctx, msg)
if err != nil {
panic(err)
}
fmt.Println(response.Content)
}RAG 概念
RAG 的核心思想是在 LLM 生成回复之前,先去外部数据源中查找相关信息,然后将这些信息作为额外上下文提供给 LLM,引导其生成更准确的回答。
为什么需要 RAG
- 减少大模型幻觉现象,提高回答的准确性和相关度
- 构建专门领域的回答资料库
RAG 三步走
- 数据准备(索引)
- 收集和处理外部数据(提前搜集):将各种形式的外部数据(如文档、数据库、网页、API 等)收集起来。这些数据可以是企业的内部知识库,也可以是互联网上的公开信息。
- 切分和嵌入(Transformer):将这些数据切分成小块(称为“chunk”),并使用嵌入模型将每个小块转换为高维度的数字向量(称为“嵌入”)。这些嵌入能够捕捉文本的语义信息。
- 存储到向量数据库(Embedding&Indexer):将这些向量存储在专门的向量数据库中,以便后续进行高效的相似性搜索。
- 检索(Reteriever)
- 用户查询:当用户提出问题或输入查询时,RAG 系统会首先将这个查询也转换为一个向量。
- 相似性搜索:然后,系统会在向量数据库中进行相似性搜索,找到与用户查询最相关的文档块(或片段)。这个过程就像在海量信息中找到最相关的“证据”。
- 增强生成
- 构建增强提示:将用户原始的查询和从向量数据库中检索到的相关信息(作为上下文)一起发送给大型语言模型。
- LLM 生成回答:LLM 在接收到这个“增强”后的提示后,会根据其自身的训练知识和提供的外部上下文来生成最终的回答。这样,LLM 不再仅仅依赖于其训练时的数据,而是有了“实时”的参考资料。
Embedding 组件
Embedding 组件是一个用于将文本转换为向量表示的组件。它的主要作用是将文本内容映射到向量空间,使得语义相似的文本在向量空间中的距离较近。常用的场景有:
- 文本相似度计算
- 文本聚类分析
- 语义搜索
接口定义
type Embedder interface {
EmbedStrings(ctx context.Context, texts []string, opts ...Option) ([][]float64, error) // invoke
}EmbedStrings 方法
- 功能:将一组文本转换为向量表示
- 参数:
ctx:上下文对象,用于传递请求级别的信息,同时也用于传递 Callback Managertexts:待转换的文本列表opts:转换选项,用于配置转换行为
- 返回值:
[][]float64:文本对应的向量表示列表,每个向量的维度由具体的实现决定error:转换过程中的错误信息
示例代码
package main
import (
"context"
"fmt"
"os"
"github.com/cloudwego/eino-ext/components/embedding/ark"
"github.com/joho/godotenv"
)
func main() {
err := godotenv.Load(".env")
if err != nil {
panic(err)
}
ctx := context.Background()
// 初始化嵌入器
embedder, err := ark.NewEmbedder(ctx, &ark.EmbeddingConfig{
APIKey: os.Getenv("ARK_API_KEY"),
Model: os.Getenv("EMBEDDER"),
})
if err != nil {
panic(err)
}
input := []string{
"你好,泥嚎",
"微服务",
"大模型",
}
embeddings, err := embedder.EmbedStrings(ctx, input)
if err != nil {
panic(err)
}
for i, embedding := range embeddings {
fmt.Printf("文本 (%d) 的向量维度是 %d\n", i+1, len(embedding))
}
}Indexer 组件
Indexer 组件是一个用于存储和索引文档的组件。它的主要作用是将文档及其向量表示存储到后端存储系统中,并提供高效的检索能力。常用的场景有:
- 构建向量数据库,以用于语义关联搜索
接口定义
type Indexer interface {
// Store stores the documents.
Store(ctx context.Context, docs []*schema.Document, opts ...Option) (ids []string, err error) // invoke
}Store 方法
- 功能:存储文档并建立索引
- 参数:
ctx:上下文对象,用于传递请求级别的信息,同时也用于传递 Callback Managerdocs:待存储的文档列表opts:存储选项,用于配置存储行为
- 返回值:
ids:存储成功的文档 ID 列表error:存储过程中的错误信息
示例代码
package main
import (
"context"
"fmt"
"os"
"github.com/cloudwego/eino-ext/components/embedding/ark"
ri "github.com/cloudwego/eino-ext/components/indexer/redis"
"github.com/cloudwego/eino/schema"
"github.com/joho/godotenv"
goredis "github.com/redis/go-redis/v9"
)
var redisClient *goredis.Client
func initRedis() {
redisClient = goredis.NewClient(&goredis.Options{
Addr: os.Getenv("REDIS_ADDR"),
Protocol: 2,
UnstableResp3: true,
})
}
func createIndex() {
ctx := context.Background()
keyPrefix := "eino:" // keyPrefix should be the prefix of keys you write to redis and want to retrieve.
indexName := "my_index" // indexName should be used in redis retriever.
// schemas should match DocumentToHashes configured in IndexerConfig.
schemas := []*goredis.FieldSchema{
{
FieldName: "content",
FieldType: goredis.SearchFieldTypeText,
Weight: 1,
},
{
FieldName: "vector_content",
FieldType: goredis.SearchFieldTypeVector,
VectorArgs: &goredis.FTVectorArgs{
FlatOptions: &goredis.FTFlatOptions{
Type: "FLOAT32", // BFLOAT16 / FLOAT16 / FLOAT32 / FLOAT64. BFLOAT16 and FLOAT16
Dim: 2560, // keeps same with dimensions of Embedding
DistanceMetric: "COSINE", // L2 / IP / COSINE
},
HNSWOptions: nil,
},
},
{
FieldName: "extra_field_number",
FieldType: goredis.SearchFieldTypeNumeric,
},
}
options := &goredis.FTCreateOptions{
OnHash: true,
Prefix: []any{keyPrefix},
}
result, err := redisClient.FTCreate(ctx, indexName, options, schemas...).Result()
if err != nil && err.Error() != "Index already exists" {
panic(err)
}
fmt.Println(result)
}
func main() {
err := godotenv.Load(".env")
if err != nil {
panic(err)
}
initRedis()
createIndex()
ctx := context.Background()
embedder, err := ark.NewEmbedder(ctx, &ark.EmbeddingConfig{
APIKey: os.Getenv("ARK_API_KEY"),
Model: os.Getenv("EMBEDDER"),
})
if err != nil {
panic(err)
}
indexer, err := ri.NewIndexer(ctx, &ri.IndexerConfig{
Client: redisClient,
KeyPrefix: "eino:",
Embedding: embedder,
})
if err != nil {
panic(err)
}
ids, err := indexer.Store(ctx, []*schema.Document{
{
ID: "1",
Content: "张恒华同学是上海工程技术大学的学生,研究方向是软件定义网络。",
MetaData: map[string]any{
"author": "Howard",
},
},
{
ID: "2",
Content: "你说得对,但是 PHP 是世界上最好的语言。",
MetaData: map[string]any{
"author": "Howard",
},
},
{
ID: "3",
Content: "张恒华的英文名为 Howard Zhang。",
MetaData: map[string]any{
"author": "Howard",
},
},
{
ID: "4",
Content: "张恒华同学今年 18 岁了。",
MetaData: map[string]any{
"author": "Howard",
},
},
})
if err != nil {
panic(err)
}
fmt.Println(ids)
}Retriever 组件
Retriever 组件是一个用于从各种数据源检索文档的组件。它的主要作用是根据用户的查询(query)从文档库中检索出最相关的文档。常用的场景有:
- 基于向量相似度的文档检索
- 基于关键词的文档搜索
- 知识库问答系统(rag)
接口定义
// Retriever is the interface for retriever.
// It is used to retrieve documents from a source.
//
// e.g.
//
// retriever, err := redis.NewRetriever(ctx, &redis.RetrieverConfig{})
// if err != nil {...}
// docs, err := retriever.Retrieve(ctx, "query") // <= using directly
// docs, err := retriever.Retrieve(ctx, "query", retriever.WithTopK(3)) // <= using options
//
// graph := compose.NewGraph[inputType, outputType](compose.RunTypeDAG)
// graph.AddRetrieverNode("retriever_node_key", retriever) // <= using in graph
type Retriever interface {
Retrieve(ctx context.Context, query string, opts ...Option) ([]*schema.Document, error)
}Retrieve 方法
- 功能:根据查询检索相关文档
- 参数:
ctx:上下文对象,用于传递请求级别的信息,同时也用于传递 Callback Managerquery:查询字符串opts:检索选项,用于配置检索行为
- 返回值:
[]*schema.Document:检索到的文档列表error:检索过程中的错误信息
Option 结构体
type Options struct {
// Index 是检索器使用的索引,不同检索器中的索引可能有不同含义
Index *string
// SubIndex 是检索器使用的子索引,不同检索器中的子索引可能有不同含义
SubIndex *string
// TopK 是检索的文档数量上限
TopK *int
// ScoreThreshold 是文档相似度的阈值,例如 0.5 表示文档的相似度分数必须大于 0.5
ScoreThreshold *float64
// Embedding 是用于生成查询向量的组件
Embedding embedding.Embedder
// DSLInfo 是用于检索的 DSL 信息,仅在 viking 类型的检索器中使用
DSLInfo map[string]interface{}
}示例代码
package main
import (
"context"
"fmt"
"os"
"github.com/cloudwego/eino-ext/components/embedding/ark"
rr "github.com/cloudwego/eino-ext/components/retriever/redis"
"github.com/joho/godotenv"
goredis "github.com/redis/go-redis/v9"
)
var redisClient *goredis.Client
func initRedis() {
redisClient = goredis.NewClient(&goredis.Options{
Addr: os.Getenv("REDIS_ADDR"),
Protocol: 2,
UnstableResp3: true,
})
}
func main() {
err := godotenv.Load(".env")
if err != nil {
panic(err)
}
initRedis()
ctx := context.Background()
embedder, err := ark.NewEmbedder(ctx, &ark.EmbeddingConfig{
APIKey: os.Getenv("ARK_API_KEY"),
Model: os.Getenv("EMBEDDER"),
})
if err != nil {
panic(err)
}
retriever, err := rr.NewRetriever(ctx, &rr.RetrieverConfig{
Client: redisClient,
Index: "my_index",
VectorField: "vector_content",
ReturnFields: []string{"content", "vector_content"},
TopK: 2,
Embedding: embedder,
})
if err != nil {
panic(err)
}
docs, err := retriever.Retrieve(ctx, "张恒华")
if err != nil {
panic(err)
}
for index, doc := range docs {
fmt.Println(index, doc)
}
}Document Transformer 组件
Document Transformer 是一个用于文档转换和处理的组件。它的主要作用是对输入的文档进行各种转换操作,如分割、过滤、合并等,从而得到满足特定需求的文档。常用的场景有:
- 将长文档分割成小段落以便于处理
- 提取文档中的特定部分
- 根据特定规则过滤文档内容
- 对文档内容进行结构化转换
接口定义
Transform 方法
- 功能:对输入的文档进行转换处理
- 参数:
ctx:上下文对象,用于传递请求级别的信息,同时也用于传递 Callback Managersrc:待处理的文档列表opts:可选参数,用于配置转换行为
- 返回值:
[]*schema.Document:转换后的文档列表error:转换过程中的错误信息
示例代码
package main
import (
"context"
"fmt"
"github.com/cloudwego/eino-ext/components/document/transformer/splitter/markdown"
"github.com/cloudwego/eino/schema"
"github.com/joho/godotenv"
goredis "github.com/redis/go-redis/v9"
)
var redisClient *goredis.Client
func main() {
err := godotenv.Load(".env")
if err != nil {
panic(err)
}
ctx := context.Background()
splitter, err := markdown.NewHeaderSplitter(ctx, &markdown.HeaderConfig{
Headers: map[string]string{
"#": "h1",
"##": "h2",
"###": "h3",
},
TrimHeaders: true, // 是否在输出的内容中移除标题行
})
if err != nil {
panic(err)
}
docs := []*schema.Document{
{
ID: "doc1",
Content: `# 文档标题
这是介绍部分的内容。
## 第一章
这是第一章的内容。
### 1.1 节
这是 1.1 节的内容。
## 第二章
这是第二章的内容。`,
},
}
results, err := splitter.Transform(ctx, docs)
if err != nil {
panic(err)
}
for _, result := range results {
fmt.Println(result.Content)
for k, v := range result.MetaData {
fmt.Println("\t", k, v)
}
}
}RAG Demo
https://github.com/zhh2001/RAG_Learning
Tool 组件
ToolsNode 组件是一个用于扩展模型能力的组件,它允许模型调用外部工具来完成特定的任务。这个组件可用于以下场景中:
- 让模型能够获取实时信息(如搜索引擎、天气查询等)
- 使模型能够执行特定的操作(如数据库操作、API 调用等)
- 扩展模型的能力范围(如数学计算、代码执行等)
- 与外部系统集成(如知识库查询、插件系统等)
接口定义
// BaseTool 基础工具接口,提供工具信息
type BaseTool interface {
Info(ctx context.Context) (*schema.ToolInfo, error)
}
// InvokableTool 支持同步调用的工具接口
type InvokableTool interface {
BaseTool
// InvokableRun call function with arguments in JSON format
InvokableRun(ctx context.Context, argumentsInJSON string, opts ...Option) (string, error)
}
// StreamableTool 支持流式输出的工具接口
type StreamableTool interface {
BaseTool
StreamableRun(ctx context.Context, argumentsInJSON string, opts ...Option) (*schema.StreamReader[string], error)
}Info 方法
- 功能:获取工具的描述信息
- 参数:
ctx:上下文对象
- 返回值:
*schema.ToolInfo:工具的描述信息,用于提供给大模型error:获取信息过程中的错误
InvokableRun 方法
- 功能:同步执行工具
- 参数:
ctx:上下文对象,用于传递请求级别的信息,同时也用于传递 Callback ManagerargumentsInJSON:JSON 格式的参数字符串opts:工具执行的选项
- 返回值:
string:执行结果error:执行过程中的错误
StreamableRun 方法
- 功能:以流式方式执行工具
- 参数:
ctx:上下文对象,用于传递请求级别的信息,同时也用于传递 Callback ManagerargumentsInJSON:JSON 格式的参数字符串opts:工具执行的选项
- 返回值:
*schema.StreamReader[string]:流式执行结果error:执行过程中的错误
ToolInfo 结构体
type ToolInfo struct {
// 工具的唯一名称,用于清晰地表达其用途
Name string
// 用于告诉模型如何/何时/为什么使用这个工具
// 可以在描述中包含少量示例
Desc string
// Extra is the extra information for the tool.
Extra map[string]any
// 工具接受的参数定义
// 可以通过两种方式描述:
// 1. 使用 ParameterInfo:schema.NewParamsOneOfByParams(params)
// 2. 使用 OpenAPIV3:schema.NewParamsOneOfByOpenAPIV3(openAPIV3)
// If is nil, signals that the tool does not need any input parameter
*ParamsOneOf
}示例代码
package main
import (
"context"
"fmt"
"net/http"
"time"
"github.com/bytedance/sonic"
req "github.com/cloudwego/eino-ext/components/tool/httprequest/get"
)
func main() {
ctx := context.Background()
config := &req.Config{
Headers: map[string]string{
"User-Agent": "MyCustomAgent",
},
HttpClient: &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{},
},
}
tool, err := req.NewTool(ctx, config)
if err != nil {
panic(err)
}
request := &req.GetRequest{
URL: "https://zhh2001.github.io/sitemap",
}
jsonReq, err := sonic.Marshal(request)
if err != nil {
panic(err)
}
resp, err := tool.InvokableRun(ctx, string(jsonReq))
if err != nil {
panic(err)
}
fmt.Println(resp)
}创建一个 tool:
package main
import (
"context"
"strings"
"github.com/cloudwego/eino/components/tool"
"github.com/cloudwego/eino/components/tool/utils"
"github.com/cloudwego/eino/schema"
)
type Note struct {
Name string `json:"name"`
Url string `json:"url"`
}
type InputParams struct {
Name string `json:"name" jsonschema:"description=技术名"`
}
func GetNote(ctx context.Context, params *InputParams) (string, error) {
NoteSet := []Note{
{Name: "P4", Url: "https://zhh2001.github.io/sdn/p4"},
{Name: "INT", Url: "https://zhh2001.github.io/sdn/int"},
{Name: "Mininet", Url: "https://zhh2001.github.io/sdn/mininet"},
{Name: "iPerf", Url: "https://zhh2001.github.io/sdn/iperf"},
}
for _, note := range NoteSet {
if strings.ToLower(params.Name) == strings.ToLower(note.Name) {
return note.Url, nil
}
}
return "", nil
}
func CreateTool() tool.InvokableTool {
GetNoteTool := utils.NewTool(&schema.ToolInfo{
Name: "get_note",
Desc: "获取笔记的链接",
ParamsOneOf: schema.NewParamsOneOfByParams(
map[string]*schema.ParameterInfo{
"name": {
Type: schema.String,
Desc: "技术名",
Required: true,
},
},
),
}, GetNote)
return GetNoteTool
}编排
Chain

package main
import (
"context"
"fmt"
"os"
"github.com/cloudwego/eino-ext/components/model/ark"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
"github.com/joho/godotenv"
)
func main() {
err := godotenv.Load(".env")
if err != nil {
panic(err)
}
ctx := context.Background()
model, err := ark.NewChatModel(ctx, &ark.ChatModelConfig{
APIKey: os.Getenv("ARK_API_KEY"),
Model: os.Getenv("MODEL"),
})
if err != nil {
panic(err)
}
lambda := compose.InvokableLambda(func(ctx context.Context, input string) (output []*schema.Message, err error) {
meow := input + "回答结尾加上‘喵’"
output = []*schema.Message{
{
Role: schema.User,
Content: meow,
},
}
return output, nil
})
chain := compose.NewChain[string, *schema.Message]()
chain.AppendLambda(lambda).AppendChatModel(model)
r, err := chain.Compile(ctx)
if err != nil {
panic(err)
}
answer, err := r.Invoke(ctx, "你好,可以告诉我你的名字吗")
if err != nil {
panic(err)
}
fmt.Println(answer.Content)
}Graph

package main
import (
"context"
"fmt"
"github.com/cloudwego/eino/compose"
)
func main() {
ctx := context.Background()
lambda0 := compose.InvokableLambda(func(ctx context.Context, input string) (output string, err error) {
switch input {
case "1":
return "猫", nil
case "2":
return "狗", nil
default:
return "人", nil
}
})
lambda1 := compose.InvokableLambda(func(ctx context.Context, input string) (output string, err error) {
return "喵喵喵", nil
})
lambda2 := compose.InvokableLambda(func(ctx context.Context, input string) (output string, err error) {
return "汪汪汪", nil
})
lambda3 := compose.InvokableLambda(func(ctx context.Context, input string) (output string, err error) {
return "嘤嘤嘤", nil
})
graph := compose.NewGraph[string, string]()
err := graph.AddLambdaNode("lambda0", lambda0)
if err != nil {
panic(err)
}
err = graph.AddLambdaNode("lambda1", lambda1)
if err != nil {
panic(err)
}
err = graph.AddLambdaNode("lambda2", lambda2)
if err != nil {
panic(err)
}
err = graph.AddLambdaNode("lambda3", lambda3)
if err != nil {
panic(err)
}
err = graph.AddBranch("lambda0", compose.NewGraphBranch(func(ctx context.Context, in string) (endNode string, err error) {
switch in {
case "猫":
return "lambda1", nil
case "狗":
return "lambda2", nil
case "人":
return "lambda3", nil
default:
return compose.END, nil
}
}, map[string]bool{
"lambda1": true,
"lambda2": true,
"lambda3": true,
}))
if err != nil {
panic(err)
}
err = graph.AddEdge(compose.START, "lambda0")
if err != nil {
panic(err)
}
err = graph.AddEdge("lambda1", compose.END)
if err != nil {
panic(err)
}
err = graph.AddEdge("lambda2", compose.END)
if err != nil {
panic(err)
}
err = graph.AddEdge("lambda3", compose.END)
if err != nil {
panic(err)
}
r, err := graph.Compile(ctx)
if err != nil {
panic(err)
}
output, err := r.Invoke(ctx, "3")
if err != nil {
panic(err)
}
fmt.Println(output)
}GraphWithModel
package main
import (
"context"
"fmt"
"os"
"github.com/cloudwego/eino-ext/components/model/ark"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
"github.com/joho/godotenv"
)
func main() {
err := godotenv.Load(".env")
if err != nil {
panic(err)
}
ctx := context.Background()
model, err := ark.NewChatModel(ctx, &ark.ChatModelConfig{
APIKey: os.Getenv("ARK_API_KEY"),
Model: os.Getenv("MODEL"),
})
if err != nil {
panic(err)
}
graph := compose.NewGraph[map[string]string, *schema.Message]()
err = graph.AddChatModelNode("model", model)
if err != nil {
panic(err)
}
lambda := compose.InvokableLambda(func(ctx context.Context, input map[string]string) (output map[string]string, err error) {
switch input["role"] {
case "tsundere":
return map[string]string{
"role": "傲娇",
"content": input["content"],
}, nil
case "cute":
return map[string]string{
"role": "可爱",
"content": input["content"],
}, nil
default:
return map[string]string{
"role": "user",
"content": input["content"],
}, nil
}
})
tsundereLambda := compose.InvokableLambda(func(ctx context.Context, input map[string]string) (output []*schema.Message, err error) {
return []*schema.Message{
{
Role: schema.System,
Content: "你是一个高冷傲娇的大小姐",
},
{
Role: schema.System,
Content: input["content"],
},
}, nil
})
cuteLambda := compose.InvokableLambda(func(ctx context.Context, input map[string]string) (output []*schema.Message, err error) {
return []*schema.Message{
{
Role: schema.System,
Content: "你是一个可爱的女高中生",
},
{
Role: schema.System,
Content: input["content"],
},
}, nil
})
err = graph.AddLambdaNode("lambda", lambda)
if err != nil {
panic(err)
}
err = graph.AddLambdaNode("tsundereLambda", tsundereLambda)
if err != nil {
panic(err)
}
err = graph.AddLambdaNode("cuteLambda", cuteLambda)
if err != nil {
panic(err)
}
graphBranch := compose.NewGraphBranch(func(ctx context.Context, in map[string]string) (endNode string, err error) {
switch in["role"] {
case "傲娇":
return "tsundereLambda", nil
case "可爱":
return "cuteLambda", nil
default:
return "tsundereLambda", nil
}
}, map[string]bool{
"tsundereLambda": true,
"cuteLambda": true,
})
err = graph.AddBranch("lambda", graphBranch)
if err != nil {
panic(err)
}
err = graph.AddEdge(compose.START, "lambda")
if err != nil {
panic(err)
}
err = graph.AddEdge("tsundereLambda", "model")
if err != nil {
panic(err)
}
err = graph.AddEdge("cuteLambda", "model")
if err != nil {
panic(err)
}
err = graph.AddEdge("model", compose.END)
if err != nil {
panic(err)
}
r, err := graph.Compile(ctx)
if err != nil {
panic(err)
}
output, err := r.Invoke(ctx, map[string]string{
"role": "tsundere",
"content": "你好啊",
})
if err != nil {
panic(err)
}
fmt.Println(output.Content)
}GraphWithState
State 定义
Graph 可以有自身的全局状态,在创建 Graph 时传入 compose.WithGenLocalState Option 开启此功能。这个请求维度的全局状态在一次请求的各环节可读写使用。
Eino 推荐用 StatePreHandler 和 StatePostHandler,功能定位是:
StatePreHandler:在每个节点执行前读写 State,以及按需替换节点的 Input。输入需对齐节点的非流式输入类型。StatePostHandler:在每个节点执行后读写 State,以及按需替换节点的 Output。输入需对齐节点的非流式输出类型。
针对流式场景,使用对应的 StreamStatePreHandler 和 StreamStatePostHandler,输入需分别对齐节点的流式输入和流式输出类型。
这些 state handlers 位于节点外部,通过对 Input 或 Output 的修改影响节点,从而保证了节点的状态无关特性。
如果需要在节点内部读写 State,Eino 提供了对应函数:
func ProcessState[S any](ctx context.Context, handler func(context.Context, S) error) error {
s, pMu, err := getState[S](ctx)
if err != nil {
return fmt.Errorf("get state from context fail: %w", err)
}
pMu.Lock()
defer pMu.Unlock()
return handler(ctx, s)
}Eino 框架会在所有读写 State 的位置加锁。
示例代码
package main
import (
"context"
"fmt"
"os"
"github.com/cloudwego/eino-ext/components/model/ark"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
"github.com/joho/godotenv"
)
type State struct {
History map[string]any
}
func getFunc(ctx context.Context) *State {
return &State{
History: make(map[string]any),
}
}
func main() {
err := godotenv.Load(".env")
if err != nil {
panic(err)
}
ctx := context.Background()
model, _ := ark.NewChatModel(ctx, &ark.ChatModelConfig{
APIKey: os.Getenv("ARK_API_KEY"),
Model: os.Getenv("MODEL"),
})
graph := compose.NewGraph[map[string]string, *schema.Message](compose.WithGenLocalState(getFunc))
_ = graph.AddChatModelNode("model", model)
lambda := compose.InvokableLambda(func(ctx context.Context, input map[string]string) (output map[string]string, err error) {
// 内部使用 state
_ = compose.ProcessState(ctx, func(ctx context.Context, s *State) error {
s.History["tsundere_action"] = "我喜欢你"
s.History["cute_action"] = "摸摸头"
return nil
})
switch input["role"] {
case "tsundere":
return map[string]string{
"role": "傲娇",
"content": input["content"],
}, nil
case "cute":
return map[string]string{
"role": "可爱",
"content": input["content"],
}, nil
default:
return map[string]string{
"role": "user",
"content": input["content"],
}, nil
}
})
tsundereLambda := compose.InvokableLambda(func(ctx context.Context, input map[string]string) (output []*schema.Message, err error) {
return []*schema.Message{
{
Role: schema.System,
Content: "你是一个高冷傲娇的大小姐",
},
{
Role: schema.System,
Content: input["content"],
},
}, nil
})
cuteLambda := compose.InvokableLambda(func(ctx context.Context, input map[string]string) (output []*schema.Message, err error) {
_ = compose.ProcessState(ctx, func(ctx context.Context, s *State) error {
input["content"] = input["content"] + s.History["cute_action"].(string)
return nil
})
return []*schema.Message{
{
Role: schema.System,
Content: "你是一个可爱的女高中生",
},
{
Role: schema.System,
Content: input["content"],
},
}, nil
})
tsundereStateToInput := func(ctx context.Context, in map[string]string, state *State) (output map[string]string, err error) {
in["content"] = in["content"] + state.History["tsundere_action"].(string)
return in, nil
}
_ = graph.AddLambdaNode("lambda", lambda)
_ = graph.AddLambdaNode("tsundereLambda", tsundereLambda, compose.WithStatePreHandler(tsundereStateToInput))
_ = graph.AddLambdaNode("cuteLambda", cuteLambda)
graphBranch := compose.NewGraphBranch(func(ctx context.Context, in map[string]string) (endNode string, err error) {
switch in["role"] {
case "傲娇":
return "tsundereLambda", nil
case "可爱":
return "cuteLambda", nil
default:
return "tsundereLambda", nil
}
}, map[string]bool{
"tsundereLambda": true,
"cuteLambda": true,
})
_ = graph.AddBranch("lambda", graphBranch)
_ = graph.AddEdge(compose.START, "lambda")
_ = graph.AddEdge("tsundereLambda", "model")
_ = graph.AddEdge("cuteLambda", "model")
_ = graph.AddEdge("model", compose.END)
r, _ := graph.Compile(ctx)
output, _ := r.Invoke(ctx, map[string]string{
"role": "tsundere",
"content": "你好啊",
})
fmt.Println(output.Content)
}GraphWithCallback
核心概念串起来,就是:Eino 中的 Component 和 Graph 等实体,在固定的时机 (Callback Timing),回调用户提供的 function (Callback Handler),并把自己是谁 (RunInfo),以及当时发生了什么 (Callback Input & Output) 传出去。
RunInfo 结构体
type RunInfo struct {
// Name is the graph node name for display purposes, not unique.
// Passed from compose.WithNodeName().
Name string
Type string
Component components.Component
}CallbackInput & CallbackOutput 类型
本质是任意类型,因为不同的 Component 的输入输出、内部状态完全不同。
type CallbackInput any
type CallbackOutput any示例代码
import "github.com/cloudwego/eino/callbacks"
func genCallback() callbacks.Handler {
handlerBuilder := callbacks.NewHandlerBuilder()
handlerBuilder.OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
fmt.Printf("节点:%s,输出:%#v\n", info.Component, input)
return ctx
})
handlerBuilder.OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
fmt.Printf("节点:%s,输出:%+v\n", info.Component, output)
return ctx
})
return handlerBuilder.Build()
}
func main() {
// ...
r, _ := graph.Compile(ctx)
output, _ := r.Invoke(ctx, input, compose.WithCallbacks(genCallback()))
fmt.Println(output.Content)
}GraphWithGraph(图嵌套)
图编排产物 Runnable 与 Lambda 的接口形式非常相似。因此编译好的图可以简单的封装为 Lambda,并以 Lambda 节点的形式嵌套进其他图中。
其实就是将图当作一个特殊的 Lambda 节点嵌入。
注意依旧要保持上下游类型对齐。
示例代码
package main
import (
"context"
"fmt"
"os"
"github.com/cloudwego/eino-ext/components/model/ark"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
"github.com/joho/godotenv"
)
func main() {
_ = godotenv.Load(".env")
ctx := context.Background()
model, _ := ark.NewChatModel(ctx, &ark.ChatModelConfig{
APIKey: os.Getenv("ARK_API_KEY"),
Model: os.Getenv("MODEL"),
})
graph := compose.NewGraph[map[string]string, *schema.Message]()
_ = graph.AddChatModelNode("model", model)
lambda := compose.InvokableLambda(func(ctx context.Context, input map[string]string) (output map[string]string, err error) {
switch input["role"] {
case "tsundere":
return map[string]string{
"role": "傲娇",
"content": input["content"],
}, nil
case "cute":
return map[string]string{
"role": "可爱",
"content": input["content"],
}, nil
default:
return map[string]string{
"role": "user",
"content": input["content"],
}, nil
}
})
tsundereLambda := compose.InvokableLambda(func(ctx context.Context, input map[string]string) (output []*schema.Message, err error) {
return []*schema.Message{
schema.SystemMessage("你是一个高冷傲娇的大小姐"),
schema.UserMessage(input["content"]),
}, nil
})
cuteLambda := compose.InvokableLambda(func(ctx context.Context, input map[string]string) (output []*schema.Message, err error) {
return []*schema.Message{
schema.SystemMessage("你是一个可爱的女高中生"),
schema.UserMessage(input["content"]),
}, nil
})
_ = graph.AddLambdaNode("lambda", lambda)
_ = graph.AddLambdaNode("tsundereLambda", tsundereLambda)
_ = graph.AddLambdaNode("cuteLambda", cuteLambda)
graphBranch := compose.NewGraphBranch(func(ctx context.Context, in map[string]string) (endNode string, err error) {
switch in["role"] {
case "傲娇":
return "tsundereLambda", nil
case "可爱":
return "cuteLambda", nil
default:
return "tsundereLambda", nil
}
}, map[string]bool{
"tsundereLambda": true,
"cuteLambda": true,
})
_ = graph.AddBranch("lambda", graphBranch)
_ = graph.AddEdge(compose.START, "lambda")
_ = graph.AddEdge("tsundereLambda", "model")
_ = graph.AddEdge("cuteLambda", "model")
_ = graph.AddEdge("model", compose.END)
// 外层图
outerGraph := compose.NewGraph[map[string]string, string]()
outerLambda1 := compose.InvokableLambda(func(ctx context.Context, input map[string]string) (output map[string]string, err error) {
return input, nil
})
outerLambda2 := compose.InvokableLambda(func(ctx context.Context, input *schema.Message) (output string, err error) {
return input.Content + "\n+++ END +++", nil
})
_ = outerGraph.AddGraphNode("innerGraph", graph)
_ = outerGraph.AddLambdaNode("outerLambda1", outerLambda1)
_ = outerGraph.AddLambdaNode("outerLambda2", outerLambda2)
_ = outerGraph.AddEdge(compose.START, "outerLambda1")
_ = outerGraph.AddEdge("outerLambda1", "innerGraph")
_ = outerGraph.AddEdge("innerGraph", "outerLambda2")
_ = outerGraph.AddEdge("outerLambda2", compose.END)
input := map[string]string{
"role": "cute",
"content": "你好啊",
}
r, _ := outerGraph.Compile(ctx)
output, _ := r.Invoke(ctx, input)
fmt.Println(output)
}CozeLoop
字节官方为 CozeLoop 实现了 Trace 回调,可以与 Eino 的应用无缝集成增强可观测能力。
package main
import (
ccb "github.com/cloudwego/eino-ext/callbacks/cozeloop"
"github.com/cloudwego/eino/callbacks"
"github.com/coze-dev/cozeloop-go"
"github.com/joho/godotenv"
)
func main() {
_ = godotenv.Load(".env")
ctx := context.Background()
// 设置相关环境变量 COZELOOP_WORKSPACE_ID、COZELOOP_API_TOKEN
client, err := cozeloop.NewClient()
if err != nil {
panic(err)
}
defer client.Close(ctx)
// 在服务 init 时 once 调用
handler := ccb.NewLoopHandler(client)
callbacks.AppendGlobalHandlers(handler)
// ...
}