o
    ˷eec                     @   s  d dl Z d dlmZ d dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlZd dlmZ d dlmZ d dlmZ d d	lmZ ejZeeZd
Zi dd ddddddddddddddddddddd d!d"d#d$d%d&d'd(d)d*d+d,d-iZd.Ze
je
je
je
je
je
je
je
j fZ!e
jfZ"e #d/d0d1gZ$G d2d3 d3e%Z&G d4d5 d5eZ'G d6d7 d7e%Z(G d8d9 d9e%Z)d:d; Z*d<d= Z+d>d? Z,d@dA Z-G dBdC dCe%Z.dS )D    N)Enum)ResumableBidiRpc)BackgroundConsumer)
exceptions)ListenRequest)Target)TargetChange)_helpersiyP  OK	CANCELLED   UNKNOWN   INVALID_ARGUMENT   DEADLINE_EXCEEDED   	NOT_FOUND   ALREADY_EXISTS   PERMISSION_DENIED   UNAUTHENTICATED   RESOURCE_EXHAUSTED   FAILED_PRECONDITION	   ABORTED
   OUT_OF_RANGE   UNIMPLEMENTED   INTERNAL   UNAVAILABLE   	DATA_LOSS   
DO_NOT_USEzThread-OnRpcTerminatedDocTreeEntryvalueindexc                   @   sT   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd ZdS )WatchDocTreec                 C   s   i | _ d| _d S )Nr   )_dict_indexself r5   V/var/www/ideatree/venv/lib/python3.10/site-packages/google/cloud/firestore_v1/watch.py__init__N   s   
zWatchDocTree.__init__c                 C   s   t | j S N)listr1   keysr3   r5   r5   r6   r:   R   s   zWatchDocTree.keysc                 C   s"   t  }| j |_| j|_|} | S r8   )r0   r1   copyr2   )r4   wdtr5   r5   r6   _copyU   s
   zWatchDocTree._copyc                 C   s,   |   } t|| j| j|< |  jd7  _| S )Nr   )r=   r-   r2   r1   )r4   keyr.   r5   r5   r6   insert\   s   zWatchDocTree.insertc                 C   s
   | j | S r8   r1   r4   r>   r5   r5   r6   findb      
zWatchDocTree.findc                 C   s   |   } | j|= | S r8   )r=   r1   rA   r5   r5   r6   removee   s   zWatchDocTree.removec                 c   s    | j D ]}|V  qd S r8   r@   r4   kr5   r5   r6   __iter__j   s   
zWatchDocTree.__iter__c                 C   s
   t | jS r8   )lenr1   r3   r5   r5   r6   __len__n   rC   zWatchDocTree.__len__c                 C   s
   || j v S r8   r@   rE   r5   r5   r6   __contains__q   rC   zWatchDocTree.__contains__N)__name__
__module____qualname__r7   r:   r=   r?   rB   rD   rG   rI   rJ   r5   r5   r5   r6   r0   J   s    r0   c                   @   s   e Zd ZdZdZdZdS )
ChangeTyper   r   r   N)rK   rL   rM   ADDEDREMOVEDMODIFIEDr5   r5   r5   r6   rN   u   s    rN   c                   @      e Zd Zdd ZdS )DocumentChangec                 C   s   || _ || _|| _|| _dS )zDocumentChange

        Args:
            type (ChangeType):
            document (document.DocumentSnapshot):
            old_index (int):
            new_index (int):
        N)typedocument	old_index	new_index)r4   rT   rU   rV   rW   r5   r5   r6   r7   |   s   

zDocumentChange.__init__NrK   rL   rM   r7   r5   r5   r5   r6   rS   {       rS   c                   @   rR   )WatchResultc                 C   s   || _ || _|| _d S r8   )snapshotnamechange_type)r4   r[   r\   r]   r5   r5   r6   r7      s   
zWatchResult.__init__NrX   r5   r5   r5   r6   rZ      rY   rZ   c                 C   s   t | tjrt| S | S )z(Wraps a gRPC exception class, if needed.)
isinstancegrpcRpcErrorr   from_grpc_error)	exceptionr5   r5   r6   _maybe_wrap_exception   s   
rc   c                 C   s   | |ksJ ddS )Nz+Document watches only support one document.r   r5   )doc1doc2r5   r5   r6   document_watch_comparator   s   rf   c                 C      t | }t|tS r8   )rc   r^   _RECOVERABLE_STREAM_EXCEPTIONSrb   wrappedr5   r5   r6   _should_recover      
rk   c                 C   rg   r8   )rc   r^   _TERMINATING_STREAM_EXCEPTIONSri   r5   r5   r6   _should_terminate   rl   rn   c                
   @   s   e Zd Zdd Zdd Zedd Zedd Zd	d
 Zdd Z	e
dd Zd.ddZdd Zdd Zdd Zdd Zdd Zdd Zdd ZejeejeejeejeejeiZd d! Zd"d# Zd$d% Zed&d' Zd(d) Z d*d+ Z!d,d- Z"dS )/Watchc                 C   sz   || _ || _|| _|| _|| _|| _|j| _t	 | _
d| _| |j d| _t | _i | _i | _d| _d| _|   dS )a  
        Args:
            firestore:
            target:
            comparator:
            snapshot_callback: Callback method to process snapshots.
                Args:
                    docs (List(DocumentSnapshot)): A callback that returns the
                        ordered list of documents stored in this snapshot.
                    changes (List(str)): A callback that returns the list of
                        changed documents since the last snapshot delivered for
                        this watch.
                    read_time (string): The ISO 8601 time at which this
                        snapshot was obtained.

            document_snapshot_cls: factory for instances of DocumentSnapshot
        FN)_document_reference
_firestore_targets_comparator_document_snapshot_cls_snapshot_callback_firestore_api_api	threadingLock_closing_closed_set_documents_pfx_database_stringresume_tokenr0   doc_treedoc_map
change_mapcurrent
has_pushed_init_stream)r4   document_reference	firestoretarget
comparatorsnapshot_callbackdocument_snapshot_clsr5   r5   r6   r7      s"   
zWatch.__init__c                 C   sP   | j }t| jjjtt|| jjd| _	| j	
| j t| j	| j| _| j  d S )N)	start_rpcshould_recovershould_terminateinitial_requestmetadata)_get_rpc_requestr   rw   
_transportlistenrk   rn   rq   _rpc_metadata_rpcadd_done_callback_on_rpc_doner   on_snapshot	_consumerstart)r4   rpc_requestr5   r5   r6   r      s   zWatch._init_streamc                 C   s"   | ||j d|jgitdt||S )a  
        Creates a watch snapshot listener for a document. snapshot_callback
        receives a DocumentChange object, but may also start to get
        targetChange and such soon

        Args:
            document_ref: Reference to Document
            snapshot_callback: callback to be called on snapshot
            document_snapshot_cls: class to make snapshots with
            reference_class_instance: class make references

        	documents)r   	target_id)_client_document_pathWATCH_TARGET_IDrf   )clsdocument_refr   r   r5   r5   r6   for_document   s   
zWatch.for_documentc                 C   s>   |j  \}}tj|| d}| ||j|jtd|j||S )N)parentstructured_query)queryr   )	_parent_parent_infor   QueryTarget_to_protobufr   _pbr   rs   )r   r   r   r   parent_path_query_targetr5   r5   r6   	for_query  s   
zWatch.for_queryc                 C   s8   | j d ur| j | jd< n| jdd  t| jj| jdS )Nr~   )database
add_target)r~   rr   popr   rq   r}   r3   r5   r5   r6   r   )  s   

zWatch._get_rpc_requestc                 C   s   | d| _ t| j | _d S )Nz/documents/)_documents_pfxrH   _documents_pfx_len)r4   database_stringr5   r5   r6   r|   3  s   zWatch._set_documents_pfxc                 C   s   | j duo| j jS )zbool: True if this manager is actively streaming.

        Note that ``False`` does not indicate this is complete shut down,
        just that it stopped getting new messages.
        N)r   	is_activer3   r5   r5   r6   r   7  s   zWatch.is_activeNc                 C   s   | j 4 | jr	 W d   dS | jrtd | j  d| _| j  d| _d| _td W d   n1 s:w   Y  |rStd|  t	|t
rO|t|dS )a  Stop consuming messages and shutdown all helper threads.

        This method is idempotent. Additional calls will have no effect.

        Args:
            reason (Any): The reason to close this. If None, this is considered
                an "intentional" shutdown.
        NzStopping consumer.TzFinished stopping manager.zreason for closing: %s)rz   r{   r   _LOGGERdebugr   stopr   closer^   	ExceptionRuntimeError)r4   reasonr5   r5   r6   r   @  s&   	



zWatch.closec                 C   s:   t d t|}tjt| jd|id}d|_|  dS )a
  Triggered whenever the underlying RPC terminates without recovery.

        This is typically triggered from one of two threads: the background
        consumer thread (when calling ``recv()`` produces a non-recoverable
        error) or the grpc management thread (when cancelling the RPC).

        This method is *non-blocking*. It will start another thread to deal
        with shutting everything down. This is to prevent blocking in the
        background consumer and preventing it from being ``joined()``.
        z.RPC termination has signaled manager shutdown.r   )r\   r   kwargsTN)	r   inforc   rx   Thread_RPC_ERROR_THREAD_NAMEr   daemonr   )r4   futurethreadr5   r5   r6   r   _  s   
zWatch._on_rpc_donec                 C   s   |    d S r8   )r   r3   r5   r5   r6   unsubscriber  s   zWatch.unsubscribec                 C   sR   t d |jd u pt|jdk}|r#|jr%| jr'| |j|j d S d S d S d S )Nz%on_snapshot: target change: NO_CHANGEr   )r   r   
target_idsrH   	read_timer   pushr~   )r4   target_changeno_target_idsr5   r5   r6   $_on_snapshot_target_change_no_changeu  s   
z*Watch._on_snapshot_target_change_no_changec                 C   s,   t d |jd }|tkrtd| d S )Nzon_snapshot: target change: ADDr   z&Unexpected target ID %s sent by server)r   r   r   r   r   )r4   r   r   r5   r5   r6   _on_snapshot_target_change_add  s
   

z$Watch._on_snapshot_target_change_addc                 C   s@   t d |jjr|jj}|jj}nd}d}d||f }t|)Nz"on_snapshot: target change: REMOVEr&   zinternal errorzError %s:  %s)r   r   causecodemessager   )r4   r   r   r   error_messager5   r5   r6   !_on_snapshot_target_change_remove  s   

z'Watch._on_snapshot_target_change_removec                 C   s   t d |   d S )Nz!on_snapshot: target change: RESET)r   r   _reset_docsr4   r   r5   r5   r6    _on_snapshot_target_change_reset  s   
z&Watch._on_snapshot_target_change_resetc                 C   s   t d d| _d S )Nz#on_snapshot: target change: CURRENTT)r   r   r   r   r5   r5   r6   "_on_snapshot_target_change_current  s   

z(Watch._on_snapshot_target_change_currentc                 C   s   | | jr|| jd  }|S r8   )
startswithr   r   )r4   document_namer5   r5   r6   _strip_document_pfx  s   zWatch._strip_document_pfxc              
   C   sZ  |du r
|    dS |j}|d}|dkr`|jj}td|  | j|}|du rAd| }t	d|  | j t
|d z	|| |j W dS  ty_ } z	td|   d}~ww |d	krtd
 t|jjv }t|jjv }	|jj}
|rtd t|
j| j}| |
j}| j|}| j||dd|
j|
jd}|| j|
j< dS |	rtd tj| j|
j< dS dS |dkrtd |jj}tj| j|< dS |dkrtd |jj}tj| j|< dS |dkrtd |jj | ! krt	d t"j#t$| j d}|%  |&  | '  | (  dS dS td d| }| j t
|d dS )aS  Process a response from the bi-directional gRPC stream.

        Collect changes and push the changes in a batch to the customer
        when we receive 'current' from the listen response.

        Args:
            proto(`google.cloud.firestore_v1.types.ListenResponse`):
                Callback method that receives a object to
        Nresponse_typer   zon_snapshot: target change: zUnknown target change type: zon_snapshot: )r   zmeth(proto) exc: document_changezon_snapshot: document changez%on_snapshot: document change: CHANGEDT)	referencedataexistsr   create_timeupdate_timez%on_snapshot: document change: REMOVEDdocument_deletez$on_snapshot: document change: DELETEdocument_removez$on_snapshot: document change: REMOVEfilterzon_snapshot: filter updatez%Filter mismatch -- restarting stream.)r\   r   zUNKNOWN TYPE. UHOHzUnknown listen response type: ))r   r   
WhichOneofr   target_change_typer   r   _target_changetype_dispatchgetr   
ValueErrorr   r   r   r   removed_target_idsrU   r	   decode_dictfieldsrq   r   r\   rt   r   r   r   rN   rP   r   r   r   count_current_sizerx   r   r   r   joinr   r   )r4   protopbwhichr   methr   exc2changedremovedrU   r   r   r   r[   r\   r   r5   r5   r6   r     s   












zWatch.on_snapshotc                 C   s   |  | j| j|\}}}| | j| j|||\}}}| jr!t|r9t| j	}	t
| |	d}
| |
|| d| _|| _|| _| j  || _dS )zInvoke the callback with a new snapshot

        Build the sntapshot from the current set of changes.

        Clear the current changes on completion.
        r>   TN)_extract_changesr   r   _compute_snapshotr   r   rH   	functools
cmp_to_keyrs   sortedr:   ru   clearr~   )r4   r   next_resume_tokendeletesaddsupdatesupdated_treeupdated_mapappliedChangesr>   r:   r5   r5   r6   r     s   




z
Watch.pushc                 C   s   g }g }g }|  D ]0\}}|tjkr|| v r|| q
|| v r.|d ur(||_|| q
|d ur5||_|| q
|||fS r8   )itemsrN   rP   appendr   )r   changesr   r   r   r   r\   r.   r5   r5   r6   r   8  s    


zWatch._extract_changesc                    s  |}|}t |t |ksJ ddd dd   fdd}g }	t| j}
t|}|D ]}|||\}}}|	| q-t||
d}td	 |D ]}td
  |||\}}}|	| qKt||
d}|D ]}||||\}}}|d ur}|	| qit |t |ksJ d|||	fS )NzJThe document tree and document map should have the same number of entries.c                 S   sP   | |v sJ d| | }||}|j}||}|| = ttj||d||fS )z
            Applies a document delete to the document tree and document map.
            Returns the corresponding DocumentChange event.
            z!Document to delete does not existr,   )r   rB   r/   rD   rS   rN   rP   )r\   r   r   old_documentexistingrV   r5   r5   r6   
delete_docX  s   


z+Watch._compute_snapshot.<locals>.delete_docc                 S   sN   | j j}||vsJ d|| d}|| j}| ||< ttj| d|||fS )z
            Applies a document add to the document tree and the document map.
            Returns the corresponding DocumentChange event.
            zDocument to add already existsNr,   )r   r   r?   rB   r/   rS   rN   rO   )new_documentr   r   r\   rW   r5   r5   r6   add_docj  s   z(Watch._compute_snapshot.<locals>.add_docc                    sv   | j j}||v sJ d||}|j| jkr6|||\}}} | ||\}}}ttj| |j|j||fS d||fS )z
            Applies a document modification to the document tree and the
            document map.
            Returns the DocumentChange event for successful modifications.
            z!Document to modify does not existN)	r   r   r   r   rS   rN   rQ   rV   rW   )r  r   r   r\   r  remove_change
add_changer	  r  r5   r6   
modify_docz  s(   



z+Watch._compute_snapshot.<locals>.modify_docr   zwalk over add_changeszin add_changeszQThe update document tree and document map should have the same number of entries.)rH   r   r   rs   r   r  r   r   )r4   r   r   delete_changesadd_changesupdate_changesr   r   r  r  r>   r\   changer[   r5   r  r6   r   M  sH   !






zWatch._compute_snapshotc                 C   s2   |  | j| jd\}}}t| jt| t| S )zsReturn the current count of all documents.

        Count includes the changes from the current changeMap.
        N)r   r   r   rH   )r4   r   r   r   r5   r5   r6   r     s   zWatch._current_sizec                 C   sH   t d | j  d| _| j D ]}|jj}t	j
| j|< qd| _dS )zG
        Helper to clear the docs on RESET or filter mismatch.
        zresetting documentsNF)r   r   r   r   r~   r   r:   r   r   rN   rP   r   )r4   r[   r\   r5   r5   r6   r     s   


zWatch._reset_docsr8   )#rK   rL   rM   r7   r   classmethodr   r   r   r|   propertyr   r   r   r   r   r   r   r   r   TargetChangeType	NO_CHANGEADDREMOVERESETCURRENTr   r   r   r   staticmethodr   r   r   r   r5   r5   r5   r6   ro      sB    >




o
qro   )/collectionsenumr   r   loggingrx   google.api_core.bidir   r   google.api_corer   r_   )google.cloud.firestore_v1.types.firestorer   r   r   google.cloud.firestore_v1r	   r  	getLoggerrK   r   r   GRPC_STATUS_CODEr   Aborted	CancelledUnknownDeadlineExceededResourceExhaustedInternalServerErrorServiceUnavailableUnauthenticatedrh   rm   
namedtupler-   objectr0   rN   rS   rZ   rc   rf   rk   rn   ro   r5   r5   r5   r6   <module>   s   
	

+