o
    ˷e                     @   s  d Z ddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
mZ ddlmZmZmZmZmZmZ ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlm Z  erjddl!m"Z" e#e$Z%G dd dej&Z'G dd dej&Z(G dd dZ)G dd de)Z*G dd dZ+ej,G dd dZ-eG dd dZ.eG dd dZ/eG dd de-Z0eG dd  d e+Z1eG d!d" d"e+Z2eG d#d$ d$e+Z3eG d%d& d&e+Z4dS )'zXHelpers for efficiently writing large amounts of data to the Google Cloud
Firestore API.    N)	dataclass)CallableDictListOptionalUnionTYPE_CHECKING)
status_pb2)_helpers)BaseDocumentReference)BulkWriteBatch)RateLimiter)BatchWriteResponse)WriteResult)
BaseClientc                   @   s(   e Zd ZdZe Ze Ze ZdS )	BulkRetryz<Indicator for what retry strategy the BulkWriter should use.N)	__name__
__module____qualname____doc__enumautoexponentiallinear	immediate r   r   \/var/www/ideatree/venv/lib/python3.10/site-packages/google/cloud/firestore_v1/bulk_writer.pyr   .   s
    r   c                   @   s    e Zd ZdZe Ze ZdS )SendModezuIndicator for whether a BulkWriter should commit batches in the main
    thread or hand that work off to an executor.N)r   r   r   r   r   r   parallelserialr   r   r   r   r   <   s    r   c                   @   sx   e Zd ZdZdd Zededed fddZded	eded d
dfddZ	ddd
e
jjfddZded
efddZdS )AsyncBulkWriterMixina  
    Mixin which contains the methods on `BulkWriter` which must only be
    submitted to the executor (or called by functions submitted to the executor).
    This mixin exists purely for organization and clarity of implementation
    (e.g., there is no metaclass magic).

    The entrypoint to the parallelizable code path is `_send_batch()`, which is
    wrapped in a decorator which ensures that the `SendMode` is honored.
    c                    s   t   fdd}|S )a  Decorates a method to ensure it is only called via the executor
        (IFF the SendMode value is SendMode.parallel!).

        Usage:

            @_with_send_mode
            def my_method(self):
                parallel_stuff()

            def something_else(self):
                # Because of the decorator around `my_method`, the following
                # method invocation:
                self.my_method()
                # becomes equivalent to `self._executor.submit(self.my_method)`
                # when the send mode is `SendMode.parallel`.

        Use on entrypoint methods for code paths that *must* be parallelized.
        c                    sT   j tjkrj fddS g R i }tj }|| |S )Nc                      s   g R i S Nr   r   )argsfnkwargsselfr   r   <lambda>m   s    zGAsyncBulkWriterMixin._with_send_mode.<locals>.wrapper.<locals>.<lambda>)	
_send_moder   r   	_executorsubmit
concurrentfuturesFuture
set_result)r%   r"   r$   resultfuturer#   )r"   r$   r%   r   wrapperj   s   

z5AsyncBulkWriterMixin._with_send_mode.<locals>.wrapper)	functoolswraps)r#   r1   r   r0   r   _with_send_modeV   s   z$AsyncBulkWriterMixin._with_send_modebatch
operationsBulkWriterOperationc                 C   s\   t |}|  j|7  _| |}|  j|8  _|  jd7  _|  j|7  _| ||| dS )a(  Sends a batch without regard to rate limits, meaning limits must have
        already been checked. To that end, do not call this directly; instead,
        call `_send_until_queue_is_empty`.

        Args:
            batch(:class:`~google.cloud.firestore_v1.base_batch.BulkWriteBatch`)
           N)len_in_flight_documents_send_total_batches_sent_total_write_operations_process_response)r%   r5   r6   
_len_batchresponser   r   r   _send_batchx   s   
z AsyncBulkWriterMixin._send_batchr@   returnNc           	      C   s   t |j }| |||  t|jD ]5\}}|jdkr)| || |j| |  q|| }| 	t
||j|jd| }|rH| jd7  _| | qdS )zInvokes submitted callbacks for each batch and each operation within
        each batch. As this is called from `_send_batch()`, this is parallelized
        if we are in that mode.
        r   )	operationcodemessager8   N)list_document_referencesvalues_batch_callback	enumeratestatusrD   _success_callbackwrite_results_error_callbackBulkWriteFailurerE   attempts_retry_operation)	r%   r5   r@   r6   batch_referencesindexrK   rC   should_retryr   r   r   r>      s2   

	

z&AsyncBulkWriterMixin._process_responserC   c                 C   sb   d}| j jtjkr|jd }n
| j jtjkr|j}tj tj|d }t	
| jt||d d S )Nr      )seconds)rC   run_at)_optionsretryr   r   rP   r   datetimeutcnow	timedeltabisectinsort_retriesOperationRetry)r%   rC   delayrW   r   r   r   rQ      s   
z%AsyncBulkWriterMixin._retry_operationc                 C   s   |  S )zHook for overwriting the sending of batches. As this is only called
        from `_send_batch()`, this is parallelized if we are in that mode.
        )commitr%   r5   r   r   r   r;      s   zAsyncBulkWriterMixin._send)r   r   r   r   r4   r   r   rA   r   r>   r*   r+   r,   rQ   r;   r   r   r   r   r    K   s0    
"
*
r    c                   @   s  e Zd ZU dZdZeed< 		dGddded fd	d
Ze	de
dedd ddfddZe	dededd ddfddZe	dddd defddZdHddZdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* Zd+d, Zdedefd-d.Z	/dIded0ed1eddfd2d3Z		/dJded4eej  d1eddfd5d6Z!	7	/dKded0ed8e"ee#f d1eddf
d9d:Z$		/dJded;e%d4eej  d1eddf
d<d=Z&d>e'eed gdf ddfd?d@Z(d>e'e
ed gdf ddfdAdBZ)d>e'dd gef ddfdCdDZ*dEdF Z+dS )L
BulkWritera  
    Accumulate and efficiently save large amounts of document write operations
    to the server.

    BulkWriter can handle large data migrations or updates, buffering records
    in memory and submitting them to the server in batches of 20.

    The submission of batches is internally parallelized with a ThreadPoolExecutor,
    meaning end developers do not need to manage an event loop or worry about asyncio
    to see parallelization speed ups (which can easily 10x throughput). Because
    of this, there is no companion `AsyncBulkWriter` class, as is usually seen
    with other utility classes.

    Usage:

        # Instantiate the BulkWriter. This works from either `Client` or
        # `AsyncClient`.
        db = firestore.Client()
        bulk_writer = db.bulk_writer()

        # Attach an optional success listener to be called once per document.
        bulk_writer.on_write_result(
            lambda reference, result, bulk_writer: print(f'Saved {reference._document_path}')
        )

        # Queue an arbitrary amount of write operations.
        # Assume `my_new_records` is a list of (DocumentReference, dict,)
        # tuple-pairs that you supply.

        reference: DocumentReference
        data: dict
        for reference, data in my_new_records:
            bulk_writer.create(reference, data)

        # Block until all pooled writes are complete.
        bulk_writer.flush()

    Args:
        client(:class:`~google.cloud.firestore_v1.client.Client`):
            The client that created this BulkWriter.
       
batch_sizeNclientr   optionsBulkWriterOptionsc                 C   s   t |jdkr| n|| _|pt | _| jj| _|  |  |   t	
g | _t	
g | _d| _g | _tj| _tj| _tj| _d| _t| jj| jjd| _d| _d| _|   d S )NAsyncClientTr   )initial_tokensglobal_max_tokens)typer   _to_sync_copy_clientri   rX   moder'   _reset_operationscollectionsdequer_   _queued_batches_is_open_pending_batch_futuresrd   _default_on_successrL   _default_on_batchrI   _default_on_errorrN   r:   r   initial_ops_per_secondmax_ops_per_second_rate_limiterr<   r=   _ensure_executor)r%   rg   rh   r   r   r   __init__  s2   
zBulkWriter.__init__r5   r@   bulk_writerrB   c                 C      d S r!   r   )r5   r@   r   r   r   r   rx   ?     zBulkWriter._default_on_batch	referencer.   c                 C   r   r!   r   )r   r.   r   r   r   r   rw   G  r   zBulkWriter._default_on_successerrorrO   c                 C   s
   | j dk S )N   )rP   )r   r   r   r   r   ry   O  s   
zBulkWriter._default_on_errorc                 C   s   g | _ g | _d S r!   )_operations_operations_document_pathsr%   r   r   r   rq   V  s   
zBulkWriter._reset_operationsc                 C   s*   t | dddu s| jjr|  | _dS dS )zBReboots the executor used to send batches if it has been shutdown.r(   N)getattrr(   	_shutdown_instantiate_executorr   r   r   r   r}   Z  s   zBulkWriter._ensure_executorc                 C   s   |    |   d S r!   )r}   _send_until_queue_is_emptyr   r   r   r   _ensure_sending_  s   zBulkWriter._ensure_sendingc                 C   s
   t j S r!   )r*   r+   ThreadPoolExecutorr   r   r   r   r   c  s   
z BulkWriter._instantiate_executorc                 C   sn   | j jrdS 	 | jr|   q| js| jr|   td q| j	r/| j	}g | _	t
j| q	 | j   dS )z~
        Block until all pooled write operations are complete and then resume
        accepting new write operations.
        NTg?)r(   r   r   _enqueue_current_batchrt   r_   r   timesleeprv   r*   r+   waitshutdown)r%   _batchesr   r   r   flushf  s"   
zBulkWriter.flushc                 C   s   d| _ |   dS )z|
        Block until all pooled write operations are complete and then reject
        any further write operations.
        FN)ru   r   r   r   r   r   close  s   zBulkWriter.closec                 C   s    t | j| jkr|   dS dS )zz
        Checks to see whether the in-progress batch is full and, if it is,
        adds it to the sending queue.
        N)r9   r   rf   r   r   r   r   r   _maybe_enqueue_current_batch  s   z'BulkWriter._maybe_enqueue_current_batchc                 C   s"   | j | j |   |   dS )zAdds the current batch to the back of the sending line, resets the
        list of queued ops, and begins the process of actually sending whatever
        batch is in the front of the line, which will often be a different batch.
        N)rt   appendr   rq   r   r   r   r   r   r     s   z!BulkWriter._enqueue_current_batchc                 C   sv   |    | jr9| j }| t| t| jd}|D ]}|| q| j||d}| j	
| |    | jsdS dS )a  First domino in the sending codepath. This does not need to be
        parallelized for two reasons:

            1) Putting this on a worker thread could lead to two running in parallel
            and thus unpredictable commit ordering or failure to adhere to
            rate limits.
            2) This method only blocks when `self._request_send()` does not immediately
            return, and in that case, the BulkWriter's ramp-up / throttling logic
            has determined that it is attempting to exceed the maximum write speed,
            and so parallelizing this method would not increase performance anyway.

        Once `self._request_send()` returns, this method calls `self._send_batch()`,
        which parallelizes itself if that is our SendMode value.

        And once `self._send_batch()` is called (which does not block if we are
        sending in parallel), jumps back to the top and re-checks for any queued
        batches.

        Note that for sufficiently large data migrations, this can block the
        submission of additional write operations (e.g., the CRUD methods);
        but again, that is only if the maximum write speed is being exceeded,
        and thus this scenario does not actually further reduce performance.
        )rg   )r5   r6   N)_schedule_ready_retriesrt   popleft_request_sendr9   r   ro   add_to_batchrA   rv   r   )r%   r6   r5   opr/   r   r   r   r     s   
z%BulkWriter._send_until_queue_is_emptyc                 C   s:   t  | jtj }t|D ]}| j }||  qdS )z+Grabs all ready retries and re-queues them.N)r]   r_   rZ   r[   ranger   rY   )r%   take_until_index_rY   r   r   r   r     s
   
z"BulkWriter._schedule_ready_retriesc                 C   s<   d}	 | j | jjk}|p| j|}|r|std qdS )NFTg{Gz?)r:   r|   _maximum_tokenstake_tokensr   r   )r%   rf   have_received_tokensunder_thresholdr   r   r   r     s   
zBulkWriter._request_sendr   document_datarP   c                 C   L   |    |j| jv r|   | jt|||d | j|j |   dS )a  Adds a `create` pb to the in-progress batch.

        If the in-progress batch already contains a write operation involving
        this document reference, the batch will be sealed and added to the commit
        queue, and a new batch will be created with this operation as its first
        entry.

        If this create operation results in the in-progress batch reaching full
        capacity, then the batch will be similarly added to the commit queue, and
        a new batch will be created for future operations.

        Args:
            reference (:class:`~google.cloud.firestore_v1.base_document.BaseDocumentReference`):
                Pointer to the document that should be created.
            document_data (dict):
                Raw data to save to the server.
        r   r   rP   N)_verify_not_closed_document_pathr   r   r   r   BulkWriterCreateOperationr   )r%   r   r   rP   r   r   r   create     zBulkWriter.createoptionc                 C   r   )aT  Adds a `delete` pb to the in-progress batch.

        If the in-progress batch already contains a write operation involving
        this document reference, the batch will be sealed and added to the commit
        queue, and a new batch will be created with this operation as its first
        entry.

        If this delete operation results in the in-progress batch reaching full
        capacity, then the batch will be similarly added to the commit queue, and
        a new batch will be created for future operations.

        Args:
            reference (:class:`~google.cloud.firestore_v1.base_document.BaseDocumentReference`):
                Pointer to the document that should be created.
            option (:class:`~google.cloud.firestore_v1._helpers.WriteOption`):
                Optional flag to modify the nature of this write.
        r   r   rP   N)r   r   r   r   r   r   BulkWriterDeleteOperationr   )r%   r   r   rP   r   r   r   delete?  r   zBulkWriter.deleteFmergec                 C   sN   |    |j| jv r|   | jt||||d | j|j |   dS )a  Adds a `set` pb to the in-progress batch.

        If the in-progress batch already contains a write operation involving
        this document reference, the batch will be sealed and added to the commit
        queue, and a new batch will be created with this operation as its first
        entry.

        If this set operation results in the in-progress batch reaching full
        capacity, then the batch will be similarly added to the commit queue, and
        a new batch will be created for future operations.

        Args:
            reference (:class:`~google.cloud.firestore_v1.base_document.BaseDocumentReference`):
                Pointer to the document that should be created.
            document_data (dict):
                Raw data to save to the server.
            merge (bool):
                Whether or not to completely overwrite any existing data with
                the supplied data.
        r   r   r   rP   N)r   r   r   r   r   r   BulkWriterSetOperationr   )r%   r   r   r   rP   r   r   r   setf  s   zBulkWriter.setfield_updatesc                 C   sb   |j jdkr
td|   |j| jv r|   | jt	||||d | j|j | 
  dS )a  Adds an `update` pb to the in-progress batch.

        If the in-progress batch already contains a write operation involving
        this document reference, the batch will be sealed and added to the commit
        queue, and a new batch will be created with this operation as its first
        entry.

        If this update operation results in the in-progress batch reaching full
        capacity, then the batch will be similarly added to the commit queue, and
        a new batch will be created for future operations.

        Args:
            reference (:class:`~google.cloud.firestore_v1.base_document.BaseDocumentReference`):
                Pointer to the document that should be created.
            field_updates (dict):
                Key paths to specific nested data that should be upated.
            option (:class:`~google.cloud.firestore_v1._helpers.WriteOption`):
                Optional flag to modify the nature of this write.
        ExistsOptionz5you must not pass an explicit write option to update.r   r   r   rP   N)	__class__r   
ValueErrorr   r   r   r   r   r   BulkWriterUpdateOperationr   )r%   r   r   r   rP   r   r   r   update  s   zBulkWriter.updatecallbackc                 C      |pt j| _dS )zISets a callback that will be invoked once for every successful operation.N)rd   rw   rL   r%   r   r   r   r   on_write_result     zBulkWriter.on_write_resultc                 C   r   )zESets a callback that will be invoked once for every successful batch.N)rd   rx   rI   r   r   r   r   on_batch_result  r   zBulkWriter.on_batch_resultc                 C   r   )zYSets a callback that will be invoked once for every batch that contains
        an error.N)rd   ry   rN   r   r   r   r   on_write_error  r   zBulkWriter.on_write_errorc                 C   s   | j stdd S )Nz5BulkWriter is closed and cannot accept new operations)ru   	Exceptionr   r   r   r   r     s   zBulkWriter._verify_not_closed)NN)rB   N)r   )Nr   )Fr   ),r   r   r   r   rf   int__annotations__r   r~   staticmethodr   r   rx   r   r   rw   boolry   rq   r}   r   r   r   r   r   r   r   r   r   r   r   r
   WriteOptionr   r   rF   r   dictr   r   r   r   r   r   r   r   r   r   rd      s   
 *
=
81!
*
+

0
0


rd   c                   @   s   e Zd ZdZdefddZdS )r7   a  Parent class for all operation container classes.

    `BulkWriterOperation` exists to house all the necessary information for a
    specific write task, including meta information like the current number of
    attempts. If a write fails, it is its wrapper `BulkWriteOperation` class
    that ferries it into its next retry without getting confused with other
    similar writes to the same document.
    r5   c                 C   s   t |tsJ t | tr|j| j| jdS t | tr#|j| j| jdS t | t	r3|j
| j| j| jdS t | trC|j| j| j| jdS td| jj d)z"Adds `self` to the supplied batch.)r   r   )r   r   )r   r   r   )r   r   r   Unexpected type of z
 for batch)
isinstancer   r   r   r   r   r   r   r   r   r   r   r   r   r   	TypeErrorr   r   rc   r   r   r   r     s4   



z BulkWriterOperation.add_to_batchN)r   r   r   r   r   r   r   r   r   r   r7     s    	r7   c                   @   s,   e Zd ZdZdddZdeddfd	d
ZdS )BaseOperationRetryzParent class for both the @dataclass and old-style `OperationRetry`
    classes.

    Methods on this class be moved directly to `OperationRetry` when support for
    Python 3.6 is dropped and `dataclasses` becomes universal.
    otherr`   c                 C   s0   t |tr| j|jk S t |tjr| j|k S tS )zAllows use of `bisect` to maintain a sorted list of `OperationRetry`
        instances, which in turn allows us to cheaply grab all that are ready to
        run.)r   r`   rW   rZ   NotImplemented)r%   r   r   r   r   __lt__  s
   

zBaseOperationRetry.__lt__r   rB   Nc                 C   s   t | jtr|j| jj| jj| jjd dS t | jtr,|j| jj| jj	| jjd dS t | jt
rE|j| jj| jj| jj| jjd dS t | jtr^|j| jj| jj| jj	| jjd dS td| jjj d)zCall this after waiting any necessary time to re-add the enclosed
        operation to the supplied BulkWriter's internal queue.r   r   r   r   r   z for OperationRetry.retryN)r   rC   r   r   r   r   rP   r   r   r   r   r   r   r   r   r   r   r   r   )r%   r   r   r   r   rY     s:   



zBaseOperationRetry.retry)r   r`   )r   r   r   r   r   rd   rY   r   r   r   r   r     s    

r   c                   @   sB   e Zd ZU dZeed< dZeed< ejZ	eed< e
jZe
ed< dS )ri   i  rz   r{   rp   rY   N)r   r   r   rz   r   r   r{   r   r   rp   r   r   rY   r   r   r   r   ri   @  s
   
 ri   c                   @   s8   e Zd ZU eed< eed< eed< edefddZdS )rO   rC   rD   rE   rB   c                 C   s   | j jS r!   )rC   rP   r   r   r   r   rP   O  s   zBulkWriteFailure.attemptsN)	r   r   r   r7   r   r   strpropertyrP   r   r   r   r   rO   H  s   
 rO   c                   @   s$   e Zd ZU dZeed< ejed< dS )r`   zRContainer for an additional attempt at an operation, scheduled for
    the future.rC   rW   N)r   r   r   r   r7   r   rZ   r   r   r   r   r`   T  s   
 r`   c                   @   s.   e Zd ZU dZeed< eed< dZeed< dS )r   z-Container for BulkWriter.create() operations.r   r   r   rP   N)	r   r   r   r   r   r   r   rP   r   r   r   r   r   r   ]  s
   
 r   c                   @   s<   e Zd ZU dZeed< eed< eej	 ed< dZ
eed< dS )r   z-Container for BulkWriter.update() operations.r   r   r   r   rP   N)r   r   r   r   r   r   r   r   r
   r   rP   r   r   r   r   r   r   f  s   
 r   c                   @   sB   e Zd ZU dZeed< eed< dZee	e
f ed< dZeed< dS )	r   z*Container for BulkWriter.set() operations.r   r   Fr   r   rP   N)r   r   r   r   r   r   r   r   r   r   rF   rP   r   r   r   r   r   r   p  s   
 r   c                   @   s4   e Zd ZU dZeed< eej ed< dZ	e
ed< dS )r   z-Container for BulkWriter.delete() operations.r   r   r   rP   N)r   r   r   r   r   r   r   r
   r   rP   r   r   r   r   r   r   z  s
   
 r   )5r   r]   rr   concurrent.futuresr*   rZ   r   r2   loggingr   dataclassesr   typingr   r   r   r   r   r   
google.rpcr	   google.cloud.firestore_v1r
   'google.cloud.firestore_v1.base_documentr   $google.cloud.firestore_v1.bulk_batchr   &google.cloud.firestore_v1.rate_limiterr   )google.cloud.firestore_v1.types.firestorer   %google.cloud.firestore_v1.types.writer   %google.cloud.firestore_v1.base_clientr   	getLoggerr   loggerEnumr   r   r    rd   r7   total_orderingr   ri   rO   r`   r   r   r   r   r   r   r   r   <module>   s^    
     +8		