Uploaded image for project: 'controller'
  1. controller
  2. CONTROLLER-2108

Improve DataJournal.handleWriteMessages()

XMLWordPrintable

    • Icon: Improvement Improvement
    • Resolution: Done
    • Icon: Medium Medium
    • 7.0.12, 9.0.2, 8.0.6
    • None
    • clustering

      The performance benchmark we have introduced in CONTROLLER-2043 shows that writes are dominated by JournalSegmentWriter.flush(), which forces entries to be durable in persistent storage.

      In our current implementation this flush() call is invoked from two places:

      1. internally in atomix-storage, when a segment file reaches the end
      2. in DataJournal.handleWriteMessages() just before we return, but after the message is acknoledged

      The first case is inevitable, as we really want to be sure a particular segment is synced before we move on.

      The second case is interesting, as sal-distributed-datastore uses persistAsync() and expects the write to be durable, e.g. having been flush()ed when completion is signalled.

      In sal-akka-segmented-journal terms this means that DataJournalV0 does the sub-optimal/incorrect thing when it does:

          @Override
          @SuppressWarnings("checkstyle:illegalCatch")
          long handleWriteMessages(final WriteMessages message) {
              final int count = message.size();
              final var writer = entries.writer();
              long bytes = 0;
      
              for (int i = 0; i < count; ++i) {
                  final long mark = writer.getLastIndex();
                  final var request = message.getRequest(i);
      
                  final var reprs = CollectionConverters.asJava(request.payload());
                  LOG.trace("{}: append {}/{}: {} items at mark {}", persistenceId, i, count, reprs.size(), mark);
                  try {
                      bytes += writePayload(writer, reprs);
                  } catch (Exception e) {
                      LOG.warn("{}: failed to write out request {}/{} reverting to {}", persistenceId, i, count, mark, e);
                      message.setFailure(i, e);
                      writer.truncate(mark);
                      continue;
                  }
      
                  message.setSuccess(i);
              }
              writer.flush();
              return bytes;
          }
      

      Note how we invoke message.setSuccess() before we issue writer.flush(). Not exactly to the expectation of the write being durable.

      I think we want to have another stage of processing here, which takes care of batching request synchronization – i.e. message.setSuccess() should really be called only after writer.flush().

      We need an explicit unit test which, unlike PerformanceTest, will use asynchronous actor execution, where multiple write requests can be issued and processed in the background. I think that benchmark will show that given async storage requests, we end up with a number of WriteMessages being outstanding, but a separate sync occuring happening for every one of them – i.e. that even in Shard submits a ton of async persistence requests which can be batched, we sync every such request separately.

      If that really is the case, we want to have a separate 'flush thread', which has a reference to the writer and an incoming queue and essetiallly runs the following pseudoloop:

         Queue<WriteMessages> toSync;
         while (true) {
            // Get all messages to sync in the queue
            final var messages = toSync.XUZ
      
           try {
              writer.flush();
           } catch (Exception e) {
              messages.forEach(msg -> msg.setException(e));
              return;
           }
      
           messages.forEach(msg -> msg.setSuccces(XYZ).
      }
      

      The overall point being: if sal-distributed-datastore's Shard actor submits persistAsync() faster than writer.flush() can handle them one by one, we should end up with batching multiple persistAsync() calls being finished by a single writer.flush().

      All of this is a conjecture and needs delving into how Akka persistence batches AtomicWrites, if at all and how the system behaves.

            rovarga Robert Varga
            rovarga Robert Varga
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

              Created:
              Updated:
              Resolved: