How to correctly use grpc asynchronously (ClientAsyncReaderWriter)

7.5k Views Asked by At

I can't find a grpc example showing how to use the ClientAsyncReaderWriter (is there one?). I tried something on my own, but am having trouble with the reference counts. My question comes from tracing through the code.

struct grpc_call has a member of type gpr_refcount called ext_ref. The ClientContext C++ object wraps the grpc_call, and holds onto it in a member grpc_call *call_;. Only when ext_ref is 0, can this grpc_call pointer be deleted.

When I use grpc synchronously with ClientReader:

  • In its implementation it uses CreateCall() and PerformOps() to add to ext_ref (ext_ref == 2).
  • Then I use Pluck() which subtracts from ext_ref so that (ext_ref == 1).
  • The last use ~ClientContext() subtracts from ext_ref, so that ext_ref == 0 and deletes the call

But when I use grpc asynchronously with ClientAsyncReaderWriter:

  • First use asyncXXX(), this API use CreateCall() and register Write() (ext_ref == 2).
  • Then it uses AsyncNext() to get tag...which must use a write or read operator.
  • So ext_ref > 1 forever, unless got_event you don't handle.

I'm calling it like this:

struct Notice
{
    std::unique_ptr<
        grpc::ClientAsyncReaderWriter<ObserveNoticRequest, EventNotice>
    >                          _rw;
    ClientContext              _context;
    EventNotice                _rsp;
}

Register Thread

CompletionQueue *cq = new CompletionQueue;
Notice *notice = new Notice;
notice->rw = stub->AsyncobserverNotice(&context, cq, notice); 

// here context.call_.ext_ref is 2

Get CompletionQueue Event Thread

void *tag = NULL;
bool ok = false;
CompletionQueue::NextStatus got = CompletionQueue::NextStatus::TIMEOUT;
gpr_timespec deadline;
deadline.clock_type = GPR_TIMESPAN;
deadline.tv_sec = 0;
deadline.tv_nsec = 10000000;

got = cq->AsyncNext<gpr_timespec>(&tag, &ok, deadline);

if (GOT_EVENT == got) {
    if (tag != NULL) {
        Notice *notice = (Notice *)tag;
        notice->_rw->Read(&_rsp, notice);

        // here context.call_.ext_ref is 2.
        // now I want to stop this CompletionQueue. 

        delete notice;

        // use ~ClientContext(), ext_ref change to 1
        // but only ext_ref == 0, call_ be deleted
    }
}
1

There are 1 best solutions below

0
Noah Eisen On

Take a look at this file, client_async.cc, for good use of the ClientAsyncReaderWriter. If you still have confusion, please create a very clean reproduction of the issue, and we will look into it further.