export const http_stream = async ( url: string, params: ReqT, events: { onStart?: (id: string, created_at?: number, messageId?: string | null) => void onTextChunk?: (message: string) => void onComplete?: (id: string | null, finished_at?: number, messageId?: string | null) => void }) => { const { onStart, onTextChunk, onComplete } = events const runtimeConfig = useRuntimeConfig() const response = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${runtimeConfig.public.DifyApiKey}`, }, body: JSON.stringify(params), }) if (!response) { throw new Error('Network response was not ok') } const reader = response.body?.getReader() const decoder = new TextDecoder('utf-8') let buffer = '' while (true) { const { done, value } = await reader?.read() || {} if (done) break buffer += decoder.decode(value, { stream: true }) const parts = buffer.split('\n\n') buffer = parts.pop()! for (const part of parts) { if (!part.trim().startsWith('data:')) continue const payload = part.replace(/^data:\s*/, '').trim() try { const obj = JSON.parse(payload) if (obj?.event && obj.event === 'workflow_started') { onStart?.(obj?.data?.id || null, obj?.data?.created_at || 0, obj?.message_id || null) } if (obj?.event && obj.event === 'workflow_finished') { onComplete?.(obj?.data?.id || null, obj?.data?.finished_at || 0, obj?.message_id || null) return } // Workflow if (obj?.event && obj.event === 'text_chunk') { let text_chunk = obj.data?.text as string if (text_chunk) { if (text_chunk.startsWith('<') && text_chunk.endsWith('>')) { const endTag = text_chunk.match(/<\/[^>]*>/) if (endTag) { text_chunk = text_chunk.replace(endTag[0], '\n' + endTag[0]) } } onTextChunk?.(text_chunk) } } // Chatflow if (obj?.event && obj.event === 'message') { let ans = obj.answer if (ans) { if (ans.startsWith('<') && ans.endsWith('>')) { const endTag = ans.match(/<\/[^>]*>/) if (endTag) { ans = ans.replace(endTag[0], '\n' + endTag[0]) } } onTextChunk?.(ans) } } } catch { // ignore } } } onComplete?.(null) }