Skip to main content

Why AI APIs Use Streaming

Modern AI APIs stream responses using Server-Sent Events (SSE) instead of waiting for the complete response:

Without Streaming

User: "Explain quantum computing"

[5 second wait...]

AI: "Quantum computing is a type of computation that..."

With Streaming

User: "Explain quantum computing"

AI: "Quantum"
AI: " computing"
AI: " is"
AI: " a"
AI: " type"
...
Benefits:
  • ⚡ Faster perceived response time
  • 📱 Better user experience (see response forming)
  • 🔄 Can cancel long responses early
  • 🛠️ Get tool calls before full response completes

Server-Sent Events (SSE) Format

AI APIs send responses as SSE streams:
data: {"id":"1","choices":[{"delta":{"content":"Hello"}}]}

data: {"id":"1","choices":[{"delta":{"content":" there"}}]}

data: {"id":"1","choices":[{"delta":{"content":"!"}}]}

data: [DONE]
Each data: line is a chunk. Combined: “Hello there!”

StreamParser Block

StreamParser extracts text and tool calls from SSE streams:
{
  "block": "StreamParser",
  "input": "${response.body}",
  "config": {
    "format": "sse-openai"
  },
  "output": {
    "text": "aiMessage",
    "toolCalls": "aiTools"
  }
}
Output:
{
  aiMessage: "I'll search for that information for you.",
  aiTools: [
    {
      name: "search_database",
      args: { query: "user data", limit: 10 }
    }
  ]
}

Supported Formats

StreamParser supports multiple streaming formats:
FormatProviderDescription
sse-openaiOpenAIChatGPT API, Azure OpenAI
sse-vercelVercel AI SDKNext.js AI applications
sseGenericStandard SSE format
textAnyPlain text (no parsing)

Basic Usage

Parse OpenAI Stream

{
  "pipeline": [
    {
      "block": "HttpRequest",
      "input": {
        "url": "https://api.openai.com/v1/chat/completions",
        "method": "POST",
        "headers": {
          "Authorization": "Bearer ${env.OPENAI_API_KEY}",
          "Content-Type": "application/json"
        },
        "body": {
          "model": "gpt-4o-mini",
          "messages": [{
            "role": "user",
            "content": "Hello!"
          }],
          "stream": true
        }
      },
      "output": "response"
    },
    {
      "block": "StreamParser",
      "input": "${response.body}",
      "config": {
        "format": "sse-openai"
      },
      "output": {
        "text": "aiMessage"
      }
    }
  ],
  "assertions": {
    "response.status": 200,
    "aiMessage": { "minLength": 1 }
  }
}

Parse Vercel AI SDK Stream

{
  "pipeline": [
    {
      "block": "HttpRequest",
      "input": {
        "url": "${YOUR_API}/chat",
        "method": "POST",
        "body": {
          "message": "Hello!"
        }
      },
      "output": "response"
    },
    {
      "block": "StreamParser",
      "input": "${response.body}",
      "config": {
        "format": "sse-vercel"
      },
      "output": {
        "text": "aiMessage",
        "toolCalls": "tools"
      }
    }
  ]
}

Extracting Content

Text Only

{
  "output": {
    "text": "aiMessage"
  }
}
Combines all text chunks into single string.

Tool Calls Only

{
  "output": {
    "toolCalls": "tools"
  }
}
Extracts all function/tool calls from stream.

Both Text and Tools

{
  "output": {
    "text": "aiMessage",
    "toolCalls": "tools"
  }
}

Include Metadata

{
  "output": {
    "text": "aiMessage",
    "toolCalls": "tools",
    "metadata": "streamMeta"
  }
}
Metadata includes:
{
  format: "sse-openai",
  totalChunks: 45,
  totalTools: 3
}

Real-World Examples

1. Test ChatGPT-Style Interface

{
  "name": "Chat API Streaming Test",
  "context": {
    "OPENAI_URL": "https://api.openai.com/v1/chat/completions",
    "API_KEY": "${env.OPENAI_API_KEY}"
  },
  "tests": [{
    "id": "test-streaming-response",
    "pipeline": [
      {
        "id": "call-chat-api",
        "block": "HttpRequest",
        "input": {
          "url": "${OPENAI_URL}",
          "method": "POST",
          "headers": {
            "Authorization": "Bearer ${API_KEY}",
            "Content-Type": "application/json"
          },
          "body": {
            "model": "gpt-4o-mini",
            "messages": [{
              "role": "user",
              "content": "Explain the benefits of TypeScript in 2-3 sentences"
            }],
            "stream": true
          }
        },
        "output": "response"
      },
      {
        "id": "parse-stream",
        "block": "StreamParser",
        "input": "${response.body}",
        "config": {
          "format": "sse-openai"
        },
        "output": {
          "text": "aiMessage",
          "metadata": "streamMeta"
        }
      },
      {
        "id": "validate-content",
        "block": "ValidateContent",
        "input": {
          "from": "aiMessage",
          "as": "text"
        },
        "config": {
          "contains": ["TypeScript"],
          "minLength": 50
        },
        "output": "contentCheck"
      },
      {
        "id": "validate-quality",
        "block": "LLMJudge",
        "input": {
          "text": "${aiMessage}",
          "expected": {
            "expectedBehavior": "Explains TypeScript benefits concisely (2-3 sentences) mentioning type safety or developer experience"
          }
        },
        "output": "qualityCheck"
      }
    ],
    "assertions": {
      "response.status": 200,
      "streamMeta.totalChunks": { "gt": 0 },
      "contentCheck.passed": true,
      "qualityCheck.score": { "gte": 0.8 }
    }
  }]
}

2. Test Tool Calls in Stream

{
  "tests": [{
    "id": "test-streaming-with-tools",
    "pipeline": [
      {
        "block": "HttpRequest",
        "input": {
          "url": "${OPENAI_URL}",
          "method": "POST",
          "headers": {
            "Authorization": "Bearer ${API_KEY}"
          },
          "body": {
            "model": "gpt-4o-mini",
            "messages": [{
              "role": "user",
              "content": "Search for users with premium subscription"
            }],
            "tools": [
              {
                "type": "function",
                "function": {
                  "name": "search_users",
                  "description": "Search users database",
                  "parameters": {
                    "type": "object",
                    "properties": {
                      "filter": { "type": "string" },
                      "limit": { "type": "number" }
                    }
                  }
                }
              }
            ],
            "stream": true
          }
        },
        "output": "response"
      },
      {
        "block": "StreamParser",
        "input": "${response.body}",
        "config": {
          "format": "sse-openai"
        },
        "output": {
          "text": "aiMessage",
          "toolCalls": "tools"
        }
      },
      {
        "block": "ValidateTools",
        "input": {
          "from": "tools",
          "as": "toolCalls"
        },
        "config": {
          "expected": ["search_users"],
          "validateArgs": {
            "search_users": {
              "filter": "premium"
            }
          }
        },
        "output": "toolValidation"
      }
    ],
    "assertions": {
      "toolValidation.passed": true,
      "tools[0].name": "search_users"
    }
  }]
}

3. Test Vercel AI SDK Streaming

{
  "tests": [{
    "id": "test-vercel-ai-stream",
    "pipeline": [
      {
        "block": "HttpRequest",
        "input": {
          "url": "${YOUR_NEXTJS_API}/api/chat",
          "method": "POST",
          "headers": {
            "Content-Type": "application/json"
          },
          "body": {
            "messages": [{
              "role": "user",
              "content": "Recommend a laptop under $1000"
            }]
          }
        },
        "output": "response"
      },
      {
        "block": "StreamParser",
        "input": "${response.body}",
        "config": {
          "format": "sse-vercel"
        },
        "output": {
          "text": "aiMessage",
          "toolCalls": "tools",
          "metadata": "meta"
        }
      },
      {
        "block": "LLMJudge",
        "input": {
          "text": "${aiMessage}",
          "expected": {
            "expectedBehavior": "Recommends specific laptop models under $1000 with reasons"
          }
        },
        "output": "judgement"
      }
    ],
    "assertions": {
      "response.status": 200,
      "aiMessage": { "minLength": 20 },
      "judgement.score": { "gte": 0.8 }
    }
  }]
}

4. Test Partial Response Quality

Test response quality even if stream is cut short:
{
  "tests": [{
    "id": "test-partial-response",
    "pipeline": [
      {
        "block": "MockData",
        "config": {
          "data": {
            "streamData": "data: {\"choices\":[{\"delta\":{\"content\":\"The\"}}]}\n\ndata: {\"choices\":[{\"delta\":{\"content\":\" capital\"}}]}\n\ndata: {\"choices\":[{\"delta\":{\"content\":\" of\"}}]}\n\n"
          }
        },
        "output": "mock"
      },
      {
        "block": "StreamParser",
        "input": "${mock.streamData}",
        "config": {
          "format": "sse-openai"
        },
        "output": {
          "text": "partial"
        }
      }
    ],
    "assertions": {
      "partial": "The capital of"
    }
  }]
}

Performance Considerations

Response Time

Streaming doesn’t make the total time faster, but improves perceived speed:
// Total time: Same
Traditional: [████████████] 5s
Streaming:   [████████████] 5s

// Time to first token: Much faster
Traditional: [............] wait 5s
Streaming:   [█...........] 0.2s first chunk

Testing Performance

{
  "pipeline": [
    {
      "block": "HttpRequest",
      "output": "response"
    }
  ],
  "assertions": {
    "response.duration": { "lt": 10000 }  // Under 10s total
  }
}
For time-to-first-chunk testing, you’d need custom timing logic (not built-in yet).

Error Handling

Incomplete Streams

{
  "pipeline": [
    {
      "block": "StreamParser",
      "input": "${response.body}",
      "output": {
        "text": "aiMessage",
        "metadata": "meta"
      }
    }
  ],
  "assertions": {
    "aiMessage": { "minLength": 1 }
  }
}

Malformed SSE

StreamParser handles common issues:
  • Missing data: prefix
  • Invalid JSON in chunks
  • Incomplete tool call objects

Timeout Handling

{
  "block": "HttpRequest",
  "input": {
    "url": "${STREAMING_API}",
    "timeout": 30000  // 30s timeout for long streams
  }
}

Combining with Validation Blocks

StreamParser → ValidateContent → LLMJudge

{
  "pipeline": [
    {
      "block": "HttpRequest",
      "output": "response"
    },
    {
      "id": "parse",
      "block": "StreamParser",
      "input": "${response.body}",
      "output": {
        "text": "aiMessage",
        "toolCalls": "tools"
      }
    },
    {
      "id": "check-keywords",
      "block": "ValidateContent",
      "input": {
        "from": "aiMessage",
        "as": "text"
      },
      "config": {
        "contains": ["order", "confirmed"],
        "minLength": 20
      },
      "output": "keywordCheck"
    },
    {
      "id": "check-tools",
      "block": "ValidateTools",
      "input": {
        "from": "tools",
        "as": "toolCalls"
      },
      "config": {
        "expected": ["create_order"]
      },
      "output": "toolCheck"
    },
    {
      "id": "check-quality",
      "block": "LLMJudge",
      "input": {
        "text": "${aiMessage}",
        "expected": {
          "expectedBehavior": "Professional order confirmation with next steps"
        }
      },
      "output": "qualityCheck"
    }
  ],
  "assertions": {
    "keywordCheck.passed": true,
    "toolCheck.passed": true,
    "qualityCheck.score": { "gte": 0.85 }
  }
}

Streaming vs Non-Streaming

  • Streaming
  • Non-Streaming
Pros:
  • Better UX (see response forming)
  • Can cancel long responses
  • Get tool calls early
Cons:
  • More complex to parse
  • Harder to debug
  • Can’t easily inspect full response
When to use:
  • User-facing chat interfaces
  • Long-form content generation
  • Real-time feedback needed

Best Practices

{
  "output": {
    "metadata": "meta"
  }
}
{
  "block": "HttpRequest",
  "input": {
    "timeout": 30000  // 30s for normal
    // OR
    "timeout": 120000  // 2min for long generations
  }
}
Streaming can take longer than traditional requests.
{
  "pipeline": [
    { "block": "StreamParser" },
    { "block": "ValidateContent" },  // Structure (fast, free)
    { "block": "LLMJudge" }           // Semantics (slower, costs $)
  ]
}
  • Empty streams
  • Incomplete streams (connection drops)
  • Very long responses (10,000+ tokens)
  • Multiple tool calls in one stream
  • Mixed text and tool calls
// ✅ Good - only extract text
{
  "output": {
    "text": "aiMessage"
  }
}

// ❌ Wasteful - extract everything
{
  "output": {
    "text": "aiMessage",
    "toolCalls": "tools",
    "chunks": "allChunks",
    "metadata": "meta"
  }
}

Debugging Streams

Problem: Empty aiMessage

Check:
  1. Is stream format correct?
  2. Look at raw response.body
{
  "assertions": {
    "response.body": true  // Print raw stream
  }
}

Problem: Tool calls not extracted

Check:
  1. Correct format? (sse-openai vs sse-vercel)
  2. Are tool calls in the response?
  3. Check metadata.totalTools
{
  "output": {
    "toolCalls": "tools",
    "metadata": "meta"
  },
  "assertions": {
    "meta.totalTools": { "gt": 0 }
  }
}

Problem: Parse errors

Common causes:
  • Wrong format specified
  • Malformed JSON in chunks
  • Mixed stream formats
Solution:
{
  "config": {
    "format": "sse"  // Try generic SSE format
  }
}

Format Details

OpenAI Format (sse-openai)

data: {"id":"1","choices":[{"delta":{"content":"Hello"},"index":0}]}

data: {"id":"1","choices":[{"delta":{"tool_calls":[{"function":{"name":"search"}}]},"index":0}]}

data: [DONE]

Vercel AI SDK Format (sse-vercel)

0:"Hello"
0:" world"
9:{"toolCallId":"123","toolName":"search"}

Generic SSE Format (sse)

data: Any text content here

data: More content

data: Even more

Next Steps

I