
    Yj                        d Z ddlmZ ddlZddlZddlZddlmZmZm	Z	m
Z
 ddlmZmZ  ej        e          Z G d d          ZddddZddZddZdS )z+SSE (Server-Sent Events) parsing utilities.    )annotationsN)AsyncGeneratorAsyncIterable	GeneratorIterable)Anycastc                  `    e Zd ZdZddZedd            Zdd
ZddZddddZ	ddZ
ddZdS )SSEStreamParsera  
    Incrementally parse an SSE byte stream into content strings.

    This parser is designed for real network streaming where byte chunks may split UTF-8
    codepoints and/or split lines arbitrarily. It maintains:

    - An incremental UTF-8 decoder to safely decode across chunk boundaries.
    - A text buffer to safely assemble complete lines across chunk boundaries.

    The Honcho streaming format is expected to include lines in the form `data:<json>` or
    `data: <json>`. Each data line should contain a JSON object with:

    - `done: true` to indicate stream completion.
    - `delta.content` containing incremental text.

    Any JSON decoding failures are logged with the same warning format as the legacy
    parser, including a preview of the data payload.
    returnNonec                h     t          j        d          d          | _        d| _        d| _        d S )Nzutf-8replace)errors F)codecsgetincrementaldecoder_decoder_text_buffer_doneselfs    W/home/ubuntu/.hermes/hermes-agent/venv/lib/python3.11/site-packages/honcho/utils/sse.py__init__zSSEStreamParser.__init__"   sD    4
63O4
 4
4 4 4 "$ 


    boolc                    | j         S )z6Whether the stream has emitted a `done: true` message.)r   r   s    r   donezSSEStreamParser.done)   s     zr   chunkbytesGenerator[str, None, None]c              #     K   | j         s|sdS | j                            |d          }|r| xj        |z  c_        |                                 E d{V  dS )aH  
        Feed the next bytes from the SSE stream and yield any newly available content.

        Args:
            chunk: Raw bytes from the SSE stream.

        Yields:
            Content strings extracted from any complete `data:` lines decoded from this
            chunk (and any previously buffered partial data).
        NFfinalr   r   decoder   _drain_complete_lines)r   r   decodeds      r   feedzSSEStreamParser.feed.   s~       : 	U 	F-&&uE&:: 	)(--///////////r   c              #     K   | j         rdS | j                            dd          }|r| xj        |z  c_        |                     d          E d{V  dS )z
        Finalize the stream and yield any remaining content.

        This should be called once the underlying byte stream is finished to flush any
        remaining decoder/buffer state.
        Nr   Tr#   flush_partialr%   )r   r(   s     r   finalizezSSEStreamParser.finalizeB   s|       : 	F-&&s$&77 	)(--D-AAAAAAAAAAAr   Fr+   r,   c             #     K   | j         s>|                     |          }|d S |                     |          E d {V  | j         <d S d S )Nr+   )r   	_pop_line_handle_line)r   r,   lines      r   r'   z%SSEStreamParser._drain_complete_linesR   su       * 	/>>>>>D|((.........	 * 	/ 	/ 	/ 	/ 	/r   
str | Nonec                  | j         sd S | j                             d          }| j                             d          }|dk    r|dk    r|sd S | j         }d| _         |S |dk    r|}n|dk    r|}nt          ||          }| j         |         }|dk    rG| j         d |         }| j         |dz   d          | _         |                    d          r
|d d         }|S |t	          | j                   dz
  k    r|sd S |dz   t	          | j                   k     r<| j         |dz            dk    r(| j         d |         }| j         |dz   d          | _         |S | j         d |         }| j         |dz   d          | _         |S )N
r         )r   findminendswithlen)r   r,   idx_nidx_rr1   idxseps          r   r/   zSSEStreamParser._pop_line[   s     	4!&&t,,!&&t,,B;;5B;;  t$D "DKB;;CCb[[CCeU##C$$;;$TcT*D $ 1#')) <D}}T"" !CRCyK#d'((1,,,],47S*++++0A#'0Jd0R0R$TcT*D $ 1#')) <DK #& -cAgii8r   r1   strc              #    K   |                     d          sd S |t          d          d                              d          }|sd S 	 t          j        |          }t          |t                    sd S t          t          t          t          f         |          }|
                    d          r	d| _        d S |
                    di           }t          |t                    sd S t          t          t          t          f         |          }|
                    d          }t          |t                    r|r|V  d S d S d S # t          j        $ r/}t                              d||d d                    Y d }~d S d }~ww xY w)	Nzdata: r   Tdeltacontentz/Failed to decode streaming chunk: %s (data: %s)d   )
startswithr<   lstripjsonloads
isinstancedictr	   rA   r   getr   JSONDecodeErrorloggerwarning)	r   r1   json_strparsed
chunk_data	delta_obj
delta_datarE   es	            r   r0   zSSEStreamParser._handle_line   s     w'' 	FG'..s33 	F	!Z11Ffd++ d38nf55J~~f%% !
"w33Ii.. d38ni88J nnY//G'3'' G    # 	 	 	NNA#        	s,   	)E 4AE 9+E &AE F$E<<FN)r   r   )r   r   )r   r    r   r!   )r   r!   )r,   r   r   r!   )r,   r   r   r2   )r1   rA   r   r!   )__name__
__module____qualname____doc__r   propertyr   r)   r-   r'   r/   r0    r   r   r   r      s         &! ! ! !    X0 0 0 0(B B B B" (-/ / / / / /' ' ' 'R     r   r   )parserr   r    r]   SSEStreamParser | Noner   r!   c             #     K   |Et                      }|                    |           E d{V  |                                E d{V  dS |                    |           E d{V  dS )a!  
    Parse bytes from an SSE stream and yield content strings.

    For correct handling of UTF-8 and line boundaries across network chunks, construct
    one `SSEStreamParser` per stream and pass it for each call:

    - `yield from parse_sse_chunk(chunk, parser=parser)`

    Args:
        chunk: Raw bytes from the SSE stream.
        parser: Optional persistent parser instance. If omitted, a temporary parser is
            created and finalized for this single chunk.

    Yields:
        Content strings extracted from delta objects.
    N)r   r)   r-   )r   r]   tmps      r   parse_sse_chunkra      s      & ~88E??"""""""<<>>!!!!!!!{{5!!!!!!!!!!!r   chunksIterable[bytes]c              #     K   t                      }| D ]'}|                    |          E d{V  |j        r dS (|                                E d{V  dS )z
    Parse an SSE byte stream and yield content strings.

    Args:
        chunks: An iterable of raw byte chunks from an SSE stream.

    Yields:
        Content strings extracted from delta objects, in order.
    Nr   r)   r   r-   )rb   r]   r   s      r   parse_sse_streamrf      s       F  ;;u%%%%%%%%%; 	FF	           r   AsyncIterable[bytes]AsyncGenerator[str, None]c                  K   t                      }| 2 3 d{V }|                    |          D ]}|W V  |j        r dS /6 |                                D ]}|W V  dS )z
    Parse an async SSE byte stream and yield content strings.

    Args:
        chunks: An async iterable of raw byte chunks from an SSE stream.

    Yields:
        Content strings extracted from delta objects, in order.
    Nre   )rb   r]   r   rE   s       r   parse_sse_astreamrj      s       F       e{{5)) 	 	GMMMMM; 	FF	 
 ??$$   s   A)r   r    r]   r^   r   r!   )rb   rc   r   r!   )rb   rg   r   rh   )rZ   
__future__r   r   rI   loggingcollections.abcr   r   r   r   typingr   r	   	getLoggerrW   rO   r   ra   rf   rj   r\   r   r   <module>rp      s   1 1 " " " " " "    N N N N N N N N N N N N        		8	$	$U U U U U U U Ur 7;" " " " " "6! ! ! !$     r   