How to stream
Streaming is critical in making applications based on LLMs feel responsive to end-users.
Important LangChain primitives like LLMs, parsers, prompts, retrievers, and agents implement the LangChain Runnable Interface.
This interface provides two general approaches to stream content:
.stream()
: a default implementation of streaming that streams the final output from the chain.streamEvents()
andstreamLog()
: these provide a way to stream both intermediate steps and final output from the chain.
Let’s take a look at both approaches!
Using Stream
All Runnable
objects implement a method called stream.
These methods are designed to stream the final output in chunks, yielding each chunk as soon as it is available.
Streaming is only possible if all steps in the program know how to process an input stream; i.e., process an input chunk one at a time, and yield a corresponding output chunk.
The complexity of this processing can vary, from straightforward tasks like emitting tokens produced by an LLM, to more challenging ones like streaming parts of JSON results before the entire JSON is complete.
The best place to start exploring streaming is with the single most important components in LLM apps – the models themselves!
LLMs and Chat Models
Large language models can take several seconds to generate a complete response to a query. This is far slower than the ~200-300 ms threshold at which an application feels responsive to an end user.
The key strategy to make the application feel more responsive is to show intermediate progress; e.g., to stream the output from the model token by token.
import "dotenv/config";
Pick your chat model:
- OpenAI
- Anthropic
- FireworksAI
- MistralAI
- Groq
- VertexAI
Install dependencies
- npm
- yarn
- pnpm
npm i @langchain/openai
yarn add @langchain/openai
pnpm add @langchain/openai
Add environment variables
OPENAI_API_KEY=your-api-key
Instantiate the model
import { ChatOpenAI } from "@langchain/openai";
const model = new ChatOpenAI({
model: "gpt-3.5-turbo",
temperature: 0
});
Install dependencies
- npm
- yarn
- pnpm
npm i @langchain/anthropic
yarn add @langchain/anthropic
pnpm add @langchain/anthropic
Add environment variables
ANTHROPIC_API_KEY=your-api-key
Instantiate the model
import { ChatAnthropic } from "@langchain/anthropic";
const model = new ChatAnthropic({
model: "claude-3-sonnet-20240229",
temperature: 0
});
Install dependencies
- npm
- yarn
- pnpm
npm i @langchain/community
yarn add @langchain/community
pnpm add @langchain/community
Add environment variables
FIREWORKS_API_KEY=your-api-key
Instantiate the model
import { ChatFireworks } from "@langchain/community/chat_models/fireworks";
const model = new ChatFireworks({
model: "accounts/fireworks/models/firefunction-v1",
temperature: 0
});
Install dependencies
- npm
- yarn
- pnpm
npm i @langchain/mistralai
yarn add @langchain/mistralai
pnpm add @langchain/mistralai
Add environment variables
MISTRAL_API_KEY=your-api-key
Instantiate the model
import { ChatMistralAI } from "@langchain/mistralai";
const model = new ChatMistralAI({
model: "mistral-large-latest",
temperature: 0
});
Install dependencies
- npm
- yarn
- pnpm
npm i @langchain/groq
yarn add @langchain/groq
pnpm add @langchain/groq
Add environment variables
GROQ_API_KEY=your-api-key
Instantiate the model
import { ChatGroq } from "@langchain/groq";
const model = new ChatGroq({
model: "mixtral-8x7b-32768",
temperature: 0
});
Install dependencies
- npm
- yarn
- pnpm
npm i @langchain/google-vertexai
yarn add @langchain/google-vertexai
pnpm add @langchain/google-vertexai
Add environment variables
GOOGLE_APPLICATION_CREDENTIALS=credentials.json
Instantiate the model
import { ChatVertexAI } from "@langchain/google-vertexai";
const model = new ChatVertexAI({
model: "gemini-1.5-pro",
temperature: 0
});
// | output: false
// | echo: false
import { ChatOpenAI } from "@langchain/openai";
const model = new ChatOpenAI({
model: "gpt-4o",
});
const stream = await model.stream("Hello! Tell me about yourself.");
const chunks = [];
for await (const chunk of stream) {
chunks.push(chunk);
console.log(`${chunk.content}|`);
}
|
Hello|
!|
I'm|
an|
AI|
language|
model|
created|
by|
Open|
AI|
,|
designed|
to|
assist|
with|
a|
wide|
range|
of|
tasks|
by|
understanding|
and|
generating|
human|
-like|
text|
based|
on|
the|
input|
I|
receive|
.|
I|
can|
help|
answer|
questions|
,|
provide|
explanations|
,|
offer|
advice|
,|
write|
creatively|
,|
and|
much|
more|
.|
How|
can|
I|
assist|
you|
today|
?|
|
Let’s have a look at one of the raw chunks:
chunks[0];
AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: "",
tool_call_chunks: [],
additional_kwargs: {},
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ "langchain_core", "messages" ],
content: "",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
We got back something called an AIMessageChunk
. This chunk represents
a part of an AIMessage
.
Message chunks are additive by design – one can simply add them up using
the .concat()
method to get the state of the response so far!
let finalChunk = chunks[0];
for (const chunk of chunks.slice(1, 5)) {
finalChunk = finalChunk.concat(chunk);
}
finalChunk;
AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: "Hello! I'm an",
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_call_chunks: [],
tool_calls: [],
invalid_tool_calls: []
},
lc_namespace: [ "langchain_core", "messages" ],
content: "Hello! I'm an",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
Chains
Virtually all LLM applications involve more steps than just a call to a language model.
Let’s build a simple chain using LangChain Expression Language
(LCEL
) that combines a prompt, model and a parser and verify that
streaming works.
We will use StringOutputParser
to parse the output from the model.
This is a simple parser that extracts the content field from an
AIMessageChunk
, giving us the token
returned by the model.
LCEL is a declarative way to specify a “program” by chainining together different LangChain primitives. Chains created using LCEL benefit from an automatic implementation of stream, allowing streaming of the final output. In fact, chains created with LCEL implement the entire standard Runnable interface.
import { StringOutputParser } from "@langchain/core/output_parsers";
import { ChatPromptTemplate } from "@langchain/core/prompts";
const prompt = ChatPromptTemplate.fromTemplate("Tell me a joke about {topic}");
const parser = new StringOutputParser();
const chain = prompt.pipe(model).pipe(parser);
const stream = await chain.stream({
topic: "parrot",
});
for await (const chunk of stream) {
console.log(`${chunk}|`);
}
|
Sure|
!|
Here's|
a|
par|
rot|
joke|
for|
you|
:
|
Why|
did|
the|
par|
rot|
get|
a|
job|
?
|
Because|
he|
was|
tired|
of|
being|
"|
pol|
ly|
-em|
ployment|
!"|
🎉|
🦜|
|
You do not have to use the LangChain Expression Language
to use
LangChain and can instead rely on a standard imperative programming
approach by caling invoke
, batch
or stream
on each component
individually, assigning the results to variables and then using them
downstream as you see fit.
If that works for your needs, then that’s fine by us 👌!
Working with Input Streams
What if you wanted to stream JSON from the output as it was being generated?
If you were to rely on JSON.parse
to parse the partial json, the
parsing would fail as the partial json wouldn’t be valid json.
You’d likely be at a complete loss of what to do and claim that it wasn’t possible to stream JSON.
Well, turns out there is a way to do it - the parser needs to operate on the input stream, and attempt to “auto-complete” the partial json into a valid state.
Let’s see such a parser in action to understand what this means.
import { JsonOutputParser } from "@langchain/core/output_parsers";
const chain = model.pipe(new JsonOutputParser());
const stream = await chain.stream(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`
);
for await (const chunk of stream) {
console.log(chunk);
}
{
countries: [
{ name: "France", population: 67372000 },
{ name: "Spain", population: 47450795 },
{ name: "Japan", population: 125960000 }
]
}
Now, let’s break streaming. We’ll use the previous example and append an extraction function at the end that extracts the country names from the finalized JSON. Since this new last step is just a function call with no defined streaming behavior, the streaming output from previous steps is aggregated, then passed as a single input to the function.
Any steps in the chain that operate on finalized inputs rather than
on input streams can break streaming functionality via stream
.
Later, we will discuss the streamEvents
API which streams results from
intermediate steps. This API will stream results from intermediate steps
even if the chain contains steps that only operate on finalized
inputs.
// A function that operates on finalized inputs
// rather than on an input_stream
// A function that does not operates on input streams and breaks streaming.
const extractCountryNames = (inputs: Record<string, any>) => {
if (!Array.isArray(inputs.countries)) {
return "";
}
return JSON.stringify(inputs.countries.map((country) => country.name));
};
const chain = model.pipe(new JsonOutputParser()).pipe(extractCountryNames);
const stream = await chain.stream(
`output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`
);
for await (const chunk of stream) {
console.log(chunk);
}
["France","Spain","Japan"]
Non-streaming components
Like the above example, some built-in components like Retrievers do not
offer any streaming. What happens if we try to stream
them?
import { OpenAIEmbeddings } from "@langchain/openai";
import { MemoryVectorStore } from "langchain/vectorstores/memory";
import { ChatPromptTemplate } from "@langchain/core/prompts";
const template = `Answer the question based only on the following context:
{context}
Question: {question}
`;
const prompt = ChatPromptTemplate.fromTemplate(template);
const vectorstore = await MemoryVectorStore.fromTexts(
["mitochondria is the powerhouse of the cell", "buildings are made of brick"],
[{}, {}],
new OpenAIEmbeddings()
);
const retriever = vectorstore.asRetriever();
const chunks = [];
for await (const chunk of await retriever.stream(
"What is the powerhouse of the cell?"
)) {
chunks.push(chunk);
}
console.log(chunks);
[
[
Document {
pageContent: "mitochondria is the powerhouse of the cell",
metadata: {}
},
Document {
pageContent: "buildings are made of brick",
metadata: {}
}
]
]
Stream just yielded the final result from that component.
This is OK! Not all components have to implement streaming – in some cases streaming is either unnecessary, difficult or just doesn’t make sense.
An LCEL chain constructed using some non-streaming components will still be able to stream in a lot of cases, with streaming of partial output starting after the last non-streaming step in the chain.
Here’s an example of this:
import {
RunnablePassthrough,
RunnableSequence,
} from "@langchain/core/runnables";
import type { Document } from "@langchain/core/documents";
import { StringOutputParser } from "@langchain/core/output_parsers";
const formatDocs = (docs: Document[]) => {
return docs.map((doc) => doc.pageContent).join("\n-----\n");
};
const retrievalChain = RunnableSequence.from([
{
context: retriever.pipe(formatDocs),
question: new RunnablePassthrough(),
},
prompt,
model,
new StringOutputParser(),
]);
const stream = await retrievalChain.stream(
"What is the powerhouse of the cell?"
);
for await (const chunk of stream) {
console.log(`${chunk}|`);
}
|
M|
ito|
ch|
ond|
ria|
is|
the|
powerhouse|
of|
the|
cell|
.|
|
Now that we’ve seen how the stream
method works, let’s venture into
the world of streaming events!
Using Stream Events
Event Streaming is a beta API. This API may change a bit based on feedback.
Introduced in @langchain/core 0.1.27.
For the streamEvents
method to work properly:
- Any custom functions / runnables must propragate callbacks
- Set proper parameters on models to force the LLM to stream tokens.
- Let us know if anything doesn’t work as expected!
Event Reference
Below is a reference table that shows some events that might be emitted by the various Runnable objects.
When streaming is implemented properly, the inputs to a runnable will
not be known until after the input stream has been entirely consumed.
This means that inputs
will often be included only for end
events
and rather than for start
events.
event | name | chunk | input | output |
---|---|---|---|---|
on_llm_start | [model name] | {‘input’: ‘hello’} | ||
on_llm_stream | [model name] | ‘Hello’ or AIMessageChunk(content=“hello”) | ||
on_llm_end | [model name] | ‘Hello human!’ | {“generations”: […], “llmOutput”: None, …} | |
on_chain_start | format_docs | |||
on_chain_stream | format_docs | “hello world!, goodbye world!” | ||
on_chain_end | format_docs | [Document(…)] | “hello world!, goodbye world!” | |
on_tool_start | some_tool | {“x”: 1, “y”: “2”} | ||
on_tool_stream | some_tool | {“x”: 1, “y”: “2”} | ||
on_tool_end | some_tool | {“x”: 1, “y”: “2”} | ||
on_retriever_start | [retriever name] | {“query”: “hello”} | ||
on_retriever_chunk | [retriever name] | {documents: […]} | ||
on_retriever_end | [retriever name] | {“query”: “hello”} | {documents: […]} | |
on_prompt_start | [template_name] | {“question”: “hello”} | ||
on_prompt_end | [template_name] | {“question”: “hello”} | ChatPromptValue(messages: [SystemMessage, …]) |
Chat Model
Let’s start off by looking at the events produced by a chat model.
const events = [];
const eventStream = await model.streamEvents("hello", { version: "v1" });
for await (const event of eventStream) {
events.push(event);
}
13
Hey what’s that funny version=“v1” parameter in the API?! 😾
This is a beta API, and we’re almost certainly going to make some changes to it.
This version parameter will allow us to mimimize such breaking changes to your code.
In short, we are annoying you now, so we don’t have to annoy you later.
Let’s take a look at the few of the start event and a few of the end events.
events.slice(0, 3);
[
{
run_id: "3394874b-6a19-4d2c-a80f-bd3ff7f25e85",
event: "on_llm_start",
name: "ChatOpenAI",
tags: [],
metadata: {},
data: { input: "hello" }
},
{
event: "on_llm_stream",
run_id: "3394874b-6a19-4d2c-a80f-bd3ff7f25e85",
tags: [],
metadata: {},
name: "ChatOpenAI",
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: "",
tool_call_chunks: [],
additional_kwargs: {},
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ "langchain_core", "messages" ],
content: "",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
}
},
{
event: "on_llm_stream",
run_id: "3394874b-6a19-4d2c-a80f-bd3ff7f25e85",
tags: [],
metadata: {},
name: "ChatOpenAI",
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: "Hello",
tool_call_chunks: [],
additional_kwargs: {},
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ "langchain_core", "messages" ],
content: "Hello",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
}
}
]
events.slice(-2);
[
{
event: "on_llm_stream",
run_id: "3394874b-6a19-4d2c-a80f-bd3ff7f25e85",
tags: [],
metadata: {},
name: "ChatOpenAI",
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: "",
tool_call_chunks: [],
additional_kwargs: {},
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ "langchain_core", "messages" ],
content: "",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: "stop" },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
}
},
{
event: "on_llm_end",
name: "ChatOpenAI",
run_id: "3394874b-6a19-4d2c-a80f-bd3ff7f25e85",
tags: [],
metadata: {},
data: { output: { generations: [ [Array] ] } }
}
]
Chain
Let’s revisit the example chain that parsed streaming JSON to explore the streaming events API.
const chain = model.pipe(new JsonOutputParser());
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v1" }
);
const events = [];
for await (const event of eventStream) {
events.push(event);
}
84
If you examine at the first few events, you’ll notice that there are 3 different start events rather than 2 start events.
The three start events correspond to:
- The chain (model + parser)
- The model
- The parser
events.slice(0, 3);
[
{
run_id: "289af8b8-7047-44e6-a475-26b88ddc7e34",
event: "on_chain_start",
name: "RunnableSequence",
tags: [],
metadata: {},
data: {
input: "Output a list of the countries france, spain and japan and their populations in JSON format. Use a d"... 129 more characters
}
},
{
event: "on_llm_start",
name: "ChatOpenAI",
run_id: "d43b539d-23ae-42ad-9bec-64faf58cf423",
tags: [ "seq:step:1" ],
metadata: {},
data: { input: { messages: [ [Array] ] } }
},
{
event: "on_parser_start",
name: "JsonOutputParser",
run_id: "91b6f786-0838-4888-8c2d-25ecd5d62d47",
tags: [ "seq:step:2" ],
metadata: {},
data: {}
}
]
What do you think you’d see if you looked at the last 3 events? what about the middle?
Let’s use this API to take output the stream events from the model and the parser. We’re ignoring start events, end events and events from the chain.
let eventCount = 0;
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v1" }
);
for await (const event of eventStream) {
// Truncate the output
if (eventCount > 30) {
continue;
}
const eventType = event.event;
if (eventType === "on_llm_stream") {
console.log(`Chat model chunk: ${event.data.chunk.message.content}`);
} else if (eventType === "on_parser_stream") {
console.log(`Parser chunk: ${JSON.stringify(event.data.chunk)}`);
}
eventCount += 1;
}
Chat model chunk:
Chat model chunk: ```
Chat model chunk: json
Chat model chunk:
Chat model chunk: {
Chat model chunk:
Chat model chunk: "
Chat model chunk: countries
Chat model chunk: ":
Chat model chunk: [
Chat model chunk:
Chat model chunk: {
Chat model chunk:
Chat model chunk: "
Chat model chunk: name
Chat model chunk: ":
Chat model chunk: "
Chat model chunk: France
Chat model chunk: ",
Chat model chunk:
Chat model chunk: "
Chat model chunk: population
Chat model chunk: ":
Chat model chunk:
Chat model chunk: 673
Chat model chunk: 480
Chat model chunk: 00
Chat model chunk:
Because both the model and the parser support streaming, we see streaming events from both components in real time! Neat! 🦜
Filtering Events
Because this API produces so many events, it is useful to be able to filter on events.
You can filter by either component name
, component tags
or component
type
.
By Name
const chain = model
.withConfig({ runName: "model" })
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }));
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v1" },
{ includeNames: ["my_parser"] }
);
let eventCount = 0;
for await (const event of eventStream) {
// Truncate the output
if (eventCount > 10) {
continue;
}
console.log(event);
eventCount += 1;
}
{
event: "on_parser_start",
name: "my_parser",
run_id: "bd05589a-0725-486b-b814-81af62ba5d80",
tags: [ "seq:step:2" ],
metadata: {},
data: {}
}
{
event: "on_parser_stream",
name: "my_parser",
run_id: "bd05589a-0725-486b-b814-81af62ba5d80",
tags: [ "seq:step:2" ],
metadata: {},
data: {
chunk: {
countries: [
{ name: "France", population: 65273511 },
{ name: "Spain", population: 46754778 },
{ name: "Japan", population: 126476461 }
]
}
}
}
{
event: "on_parser_end",
name: "my_parser",
run_id: "bd05589a-0725-486b-b814-81af62ba5d80",
tags: [ "seq:step:2" ],
metadata: {},
data: {
output: {
countries: [
{ name: "France", population: 65273511 },
{ name: "Spain", population: 46754778 },
{ name: "Japan", population: 126476461 }
]
}
}
}
3
By type
const chain = model
.withConfig({ runName: "model" })
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }));
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v1" },
{ includeTypes: ["llm"] }
);
let eventCount = 0;
for await (const event of eventStream) {
// Truncate the output
if (eventCount > 10) {
continue;
}
console.log(event);
eventCount += 1;
}
{
event: "on_llm_start",
name: "model",
run_id: "4c7dbe4a-57ea-40b9-9fc0-a77d7851c5fd",
tags: [ "seq:step:1" ],
metadata: {},
data: {
input: { messages: [ [ [HumanMessage] ] ] }
}
}
{
event: "on_llm_stream",
name: "model",
run_id: "4c7dbe4a-57ea-40b9-9fc0-a77d7851c5fd",
tags: [ "seq:step:1" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: "",
generationInfo: { prompt: 0, completion: 0, finish_reason: null },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: "",
tool_call_chunks: [],
additional_kwargs: {},
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ "langchain_core", "messages" ],
content: "",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
}
}
}
{
event: "on_llm_stream",
name: "model",
run_id: "4c7dbe4a-57ea-40b9-9fc0-a77d7851c5fd",
tags: [ "seq:step:1" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: "Sure",
generationInfo: { prompt: 0, completion: 0, finish_reason: null },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: "Sure",
tool_call_chunks: [],
additional_kwargs: {},
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ "langchain_core", "messages" ],
content: "Sure",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
}
}
}
{
event: "on_llm_stream",
name: "model",
run_id: "4c7dbe4a-57ea-40b9-9fc0-a77d7851c5fd",
tags: [ "seq:step:1" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: ",",
generationInfo: { prompt: 0, completion: 0, finish_reason: null },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: ",",
tool_call_chunks: [],
additional_kwargs: {},
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ "langchain_core", "messages" ],
content: ",",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
}
}
}
{
event: "on_llm_stream",
name: "model",
run_id: "4c7dbe4a-57ea-40b9-9fc0-a77d7851c5fd",
tags: [ "seq:step:1" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: " here's",
generationInfo: { prompt: 0, completion: 0, finish_reason: null },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: " here's",
tool_call_chunks: [],
additional_kwargs: {},
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ "langchain_core", "messages" ],
content: " here's",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
}
}
}
{
event: "on_llm_stream",
name: "model",
run_id: "4c7dbe4a-57ea-40b9-9fc0-a77d7851c5fd",
tags: [ "seq:step:1" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: " the",
generationInfo: { prompt: 0, completion: 0, finish_reason: null },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: " the",
tool_call_chunks: [],
additional_kwargs: {},
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ "langchain_core", "messages" ],
content: " the",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
}
}
}
{
event: "on_llm_stream",
name: "model",
run_id: "4c7dbe4a-57ea-40b9-9fc0-a77d7851c5fd",
tags: [ "seq:step:1" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: " JSON",
generationInfo: { prompt: 0, completion: 0, finish_reason: null },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: " JSON",
tool_call_chunks: [],
additional_kwargs: {},
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ "langchain_core", "messages" ],
content: " JSON",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
}
}
}
{
event: "on_llm_stream",
name: "model",
run_id: "4c7dbe4a-57ea-40b9-9fc0-a77d7851c5fd",
tags: [ "seq:step:1" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: " representation",
generationInfo: { prompt: 0, completion: 0, finish_reason: null },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: " representation",
tool_call_chunks: [],
additional_kwargs: {},
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ "langchain_core", "messages" ],
content: " representation",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
}
}
}
{
event: "on_llm_stream",
name: "model",
run_id: "4c7dbe4a-57ea-40b9-9fc0-a77d7851c5fd",
tags: [ "seq:step:1" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: " of",
generationInfo: { prompt: 0, completion: 0, finish_reason: null },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: " of",
tool_call_chunks: [],
additional_kwargs: {},
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ "langchain_core", "messages" ],
content: " of",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
}
}
}
{
event: "on_llm_stream",
name: "model",
run_id: "4c7dbe4a-57ea-40b9-9fc0-a77d7851c5fd",
tags: [ "seq:step:1" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: " the",
generationInfo: { prompt: 0, completion: 0, finish_reason: null },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: " the",
tool_call_chunks: [],
additional_kwargs: {},
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ "langchain_core", "messages" ],
content: " the",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
}
}
}
{
event: "on_llm_stream",
name: "model",
run_id: "4c7dbe4a-57ea-40b9-9fc0-a77d7851c5fd",
tags: [ "seq:step:1" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: " countries",
generationInfo: { prompt: 0, completion: 0, finish_reason: null },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: " countries",
tool_call_chunks: [],
additional_kwargs: {},
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ "langchain_core", "messages" ],
content: " countries",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
}
}
}
By Tags
Tags are inherited by child components of a given runnable.
If you’re using tags to filter, make sure that this is what you want.
const chain = model
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }))
.withConfig({ tags: ["my_chain"] });
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v1" },
{ includeTags: ["my_chain"] }
);
let eventCount = 0;
for await (const event of eventStream) {
// Truncate the output
if (eventCount > 10) {
continue;
}
console.log(event);
eventCount += 1;
}
{
run_id: "83f4ef67-4970-44f7-8ae1-5ebace8cbce0",
event: "on_chain_start",
name: "RunnableSequence",
tags: [ "my_chain" ],
metadata: {},
data: {
input: "Output a list of the countries france, spain and japan and their populations in JSON format. Use a d"... 129 more characters
}
}
{
event: "on_llm_start",
name: "ChatOpenAI",
run_id: "d84bca89-bc85-4d1d-a7af-3403f5789bd0",
tags: [ "seq:step:1", "my_chain" ],
metadata: {},
data: {
input: { messages: [ [ [HumanMessage] ] ] }
}
}
{
event: "on_parser_start",
name: "my_parser",
run_id: "346234e7-b109-4bf7-a568-70edd67bc209",
tags: [ "seq:step:2", "my_chain" ],
metadata: {},
data: {}
}
{
event: "on_llm_stream",
name: "ChatOpenAI",
run_id: "d84bca89-bc85-4d1d-a7af-3403f5789bd0",
tags: [ "seq:step:1", "my_chain" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: "",
generationInfo: { prompt: 0, completion: 0, finish_reason: null },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: "",
tool_call_chunks: [],
additional_kwargs: {},
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ "langchain_core", "messages" ],
content: "",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
}
}
}
{
event: "on_llm_stream",
name: "ChatOpenAI",
run_id: "d84bca89-bc85-4d1d-a7af-3403f5789bd0",
tags: [ "seq:step:1", "my_chain" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: "```",
generationInfo: { prompt: 0, completion: 0, finish_reason: null },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: "```",
tool_call_chunks: [],
additional_kwargs: {},
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ "langchain_core", "messages" ],
content: "```",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
}
}
}
{
event: "on_llm_stream",
name: "ChatOpenAI",
run_id: "d84bca89-bc85-4d1d-a7af-3403f5789bd0",
tags: [ "seq:step:1", "my_chain" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: "json",
generationInfo: { prompt: 0, completion: 0, finish_reason: null },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: "json",
tool_call_chunks: [],
additional_kwargs: {},
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ "langchain_core", "messages" ],
content: "json",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
}
}
}
{
event: "on_llm_stream",
name: "ChatOpenAI",
run_id: "d84bca89-bc85-4d1d-a7af-3403f5789bd0",
tags: [ "seq:step:1", "my_chain" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: "\n",
generationInfo: { prompt: 0, completion: 0, finish_reason: null },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: "\n",
tool_call_chunks: [],
additional_kwargs: {},
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ "langchain_core", "messages" ],
content: "\n",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
}
}
}
{
event: "on_llm_stream",
name: "ChatOpenAI",
run_id: "d84bca89-bc85-4d1d-a7af-3403f5789bd0",
tags: [ "seq:step:1", "my_chain" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: "{\n",
generationInfo: { prompt: 0, completion: 0, finish_reason: null },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: "{\n",
tool_call_chunks: [],
additional_kwargs: {},
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ "langchain_core", "messages" ],
content: "{\n",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
}
}
}
{
event: "on_llm_stream",
name: "ChatOpenAI",
run_id: "d84bca89-bc85-4d1d-a7af-3403f5789bd0",
tags: [ "seq:step:1", "my_chain" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: " ",
generationInfo: { prompt: 0, completion: 0, finish_reason: null },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: " ",
tool_call_chunks: [],
additional_kwargs: {},
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ "langchain_core", "messages" ],
content: " ",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
}
}
}
{
event: "on_llm_stream",
name: "ChatOpenAI",
run_id: "d84bca89-bc85-4d1d-a7af-3403f5789bd0",
tags: [ "seq:step:1", "my_chain" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: ' "',
generationInfo: { prompt: 0, completion: 0, finish_reason: null },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: ' "',
tool_call_chunks: [],
additional_kwargs: {},
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ "langchain_core", "messages" ],
content: ' "',
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
}
}
}
{
event: "on_llm_stream",
name: "ChatOpenAI",
run_id: "d84bca89-bc85-4d1d-a7af-3403f5789bd0",
tags: [ "seq:step:1", "my_chain" ],
metadata: {},
data: {
chunk: ChatGenerationChunk {
text: "countries",
generationInfo: { prompt: 0, completion: 0, finish_reason: null },
message: AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: "countries",
tool_call_chunks: [],
additional_kwargs: {},
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ "langchain_core", "messages" ],
content: "countries",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: []
}
}
}
}
Streaming events over HTTP
For convenience, streamEvents
supports encoding streamed intermediate
events as HTTP server-sent
events,
encoded as bytes. Here’s what that looks like (using a
TextDecoder
to reconvert the binary data back into a human readable string):
const chain = model
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }))
.withConfig({ tags: ["my_chain"] });
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{
version: "v1",
encoding: "text/event-stream",
}
);
let eventCount = 0;
const textDecoder = new TextDecoder();
for await (const event of eventStream) {
// Truncate the output
if (eventCount > 3) {
continue;
}
console.log(textDecoder.decode(event));
eventCount += 1;
}
event: data
data: {"run_id":"9344be82-f4e6-49be-9eea-88eb2ae53340","event":"on_chain_start","name":"RunnableSequence","tags":["my_chain"],"metadata":{},"data":{"input":"Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key \"name\" and \"population\""}}
event: data
data: {"event":"on_llm_start","name":"ChatOpenAI","run_id":"20640210-4b45-4ac3-9e5e-ad6e6d48431f","tags":["seq:step:1","my_chain"],"metadata":{},"data":{"input":{"messages":[[{"lc":1,"type":"constructor","id":["langchain_core","messages","HumanMessage"],"kwargs":{"content":"Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key \"name\" and \"population\"","additional_kwargs":{},"response_metadata":{}}}]]}}}
event: data
data: {"event":"on_parser_start","name":"my_parser","run_id":"0d035118-36bc-49a3-9bdd-5fcf8afcc5da","tags":["seq:step:2","my_chain"],"metadata":{},"data":{}}
event: data
data: {"event":"on_llm_stream","name":"ChatOpenAI","run_id":"20640210-4b45-4ac3-9e5e-ad6e6d48431f","tags":["seq:step:1","my_chain"],"metadata":{},"data":{"chunk":{"text":"","generationInfo":{"prompt":0,"completion":0,"finish_reason":null},"message":{"lc":1,"type":"constructor","id":["langchain_core","messages","AIMessageChunk"],"kwargs":{"content":"","tool_call_chunks":[],"additional_kwargs":{},"tool_calls":[],"invalid_tool_calls":[],"response_metadata":{"prompt":0,"completion":0,"finish_reason":null}}}}}}
A nice feature of this format is that you can pass the resulting stream directly into a native HTTP response object with the correct headers (commonly used by frameworks like Hono and Next.js), then parse that stream on the frontend. Your server-side handler would look something like this:
const handler = async () => {
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{
version: "v1",
encoding: "text/event-stream",
}
);
return new Response(eventStream, {
headers: {
"content-type": "text/event-stream",
},
});
};
And your frontend could look like this (using the
@microsoft/fetch-event-source
pacakge to fetch and parse the event source):
import { fetchEventSource } from "@microsoft/fetch-event-source";
const makeChainRequest = async () => {
await fetchEventSource("https://your_url_here", {
method: "POST",
body: JSON.stringify({
foo: "bar",
}),
onmessage: (message) => {
if (message.event === "data") {
console.log(message.data);
}
},
onerror: (err) => {
console.log(err);
},
});
};
Non-streaming components
Remember how some components don’t stream well because they don’t operate on input streams?
While such components can break streaming of the final output when using
stream
, streamEvents
will still yield streaming events from
intermediate steps that support streaming!
// A function that operates on finalized inputs
// rather than on an input_stream
// A function that does not operates on input streams and breaks streaming.
const extractCountryNames = (inputs: Record<string, any>) => {
if (!Array.isArray(inputs.countries)) {
return "";
}
return JSON.stringify(inputs.countries.map((country) => country.name));
};
const chain = model.pipe(new JsonOutputParser()).pipe(extractCountryNames);
const stream = await chain.stream(
`output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`
);
for await (const chunk of stream) {
console.log(chunk);
}
["France","Spain","Japan"]
As expected, the stream
API doesn’t work correctly because
extractCountryNames
doesn’t operate on streams.
Now, let’s confirm that with streamEvents
we’re still seeing streaming
output from the model and the parser.
const eventStream = await chain.streamEvents(
`output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v1" }
);
let eventCount = 0;
for await (const event of eventStream) {
// Truncate the output
if (eventCount > 30) {
continue;
}
const eventType = event.event;
if (eventType === "on_llm_stream") {
console.log(`Chat model chunk: ${event.data.chunk.message.content}`);
} else if (eventType === "on_parser_stream") {
console.log(`Parser chunk: ${JSON.stringify(event.data.chunk)}`);
}
eventCount += 1;
}
Chat model chunk:
Chat model chunk: Here's
Chat model chunk: how
Chat model chunk: you
Chat model chunk: can
Chat model chunk: represent
Chat model chunk: the
Chat model chunk: countries
Chat model chunk: France
Chat model chunk: ,
Chat model chunk: Spain
Chat model chunk: ,
Chat model chunk: and
Chat model chunk: Japan
Chat model chunk: ,
Chat model chunk: along
Chat model chunk: with
Chat model chunk: their
Chat model chunk: populations
Chat model chunk: ,
Chat model chunk: in
Chat model chunk: JSON
Chat model chunk: format
Chat model chunk: :
Chat model chunk: ```
Chat model chunk: json
Chat model chunk:
Chat model chunk: {