Why is live audio extracted from Amazon Connect through Kinesis Video Stream noisy and incomplete?

319 Views Asked by At

Using the example from the docs for Amazon Connect, I wrote logic (see below) to push chunks from a live audio stream out to an external Websockets server. Per specs, each chunk is 16-bit PCM mono-channel audio, sampled at 8kHz. In the server that receives said audio chunks, I save each chunk to a separate file; said files are named with a unique timestamp. Using sox, I convert each raw audio file to a WAV file:

$ sox -r 8k -c 2 -e signed -b16 -t raw chunk-1692154346612.raw chunk-1692154346612.wav

Again using sox, I concatenate all WAV files into a single file:

$ sox *.wav all-chunks.wav

Shell-expansion here (ie, *.wav) effectively sorts the WAV-file list chronologically.

Then I load all-chunks.wav into Audacity and play the audio. Overall, the entire audio is fairly noisy or choppy, and even missing much of the call conversation.

Following logic implements a handler deployed as a Lambda function:

public class ProcessAudioOverKVS implements RequestHandler<TranscriptionRequest, String>
{
  private static final Logger logger = LoggerFactory.getLogger(ProcessAudioOverKVS.class);
  private static final Regions REGION = Regions.fromName(System.getenv("APP_REGION"));
  private static final String START_SELECTOR_TYPE = System.getenv("START_SELECTOR_TYPE");
  private static final int CHUNK_SIZE_IN_KB = 4;
  private static final String RELAY_AUDIO_CHUNK_URL = System.getenv("RELAY_AUDIO_CHUNK_URL");

  /**
   * Handler function for the Lambda. TranscriptionRequest has these properties:
   * - streamARN: unique ARN that identifies live audio stream over Kinesis Video Streams.
   * - startFragmentNum: integer denoting start of audio content.
   * - connectContactId: unique ID that identifies the caller.
   * - transcriptionEnabled: boolean, always false.
   * - languageCode: set to en-US.
   * - saveCallRecording: boolean, always set to false.
   * - streamAudioFromCustomer: boolean, always true.
   * - streamAudioToCustomer: boolean, always true,
   *
   * @param request
   * @param context
   * @return
   */
  @Override
  public String handleRequest(TranscriptionRequest request, Context context)
  {
    logger.info("received request : " + request.toString());
    logger.info("received context: " + context.toString());

    boolean success = false;
    WebSocketClient wsClient = new WebSocketClient();
    try {
      request.validate();
      wsClient.Connect(RELAY_AUDIO_CHUNK_URL);
      pushAudioChunks(wsClient, request);
      success = true;
    } catch (Exception e) {
      logger.error("ProcessAudioOverKVS failed", e);
    } finally {
      try { wsClient.Disconnect(); } catch (Exception e) {}
    }
    return success ? "{ \"result\": \"Success\" }" : "{ \"result\": \"Failed\" }";
  }

  private void pushAudioChunks(WebSocketClient wsClient, TranscriptionRequest request) throws Exception
  {
    logger.debug("pushAudioChunks ...");
    String streamARN = request.getStreamARN();
    String streamName = streamARN.substring(streamARN.indexOf("/") + 1, streamARN.lastIndexOf("/"));
    String contactId = request.getConnectContactId();
    String startFragmentNum = request.getStartFragmentNum();

    InputStream kvsInputStream = KVSUtils.getInputStreamFromKVS(
      streamName, REGION, startFragmentNum, getAWSCredentials(), START_SELECTOR_TYPE
    );
    StreamingMkvReader streamingMkvReader = StreamingMkvReader.createDefault(new InputStreamParserByteSource(kvsInputStream));
    KVSContactTagProcessor tagProcessor = new KVSContactTagProcessor(contactId);
    FragmentMetadataVisitor fragmentVisitor = FragmentMetadataVisitor.create(Optional.of(tagProcessor));

    List<KVSUtils.AudioChunkTrack> chunkList = KVSUtils.getByteBufferFromStream(
      streamingMkvReader, fragmentVisitor, tagProcessor, contactId, CHUNK_SIZE_IN_KB
    );
    boolean haveAudio = chunkList.size() > 0 && chunkList.get(0).audio.remaining() > 0;
    while (haveAudio) {
      for (KVSUtils.AudioChunkTrack act : chunkList) {
        if (act.audio.remaining() > 0) {
          byte[] audioBytes = new byte[act.audio.remaining()];
          act.audio.get(audioBytes);
          try {
            relayAudioChunk(wsClient, contactId, audioBytes, act.track, act.trackNum);
          } catch (Exception ex) {
            logger.error(
              "could not relay audio chunk for contactId={}, track={}, trackNum={}",
              contactId, act.track, act.trackNum, ex
            );
          }
        }
      }
      chunkList = KVSUtils.getByteBufferFromStream(
        streamingMkvReader, fragmentVisitor, tagProcessor, contactId, CHUNK_SIZE_IN_KB
      );
      haveAudio = chunkList.size() > 0 && chunkList.get(0).audio.remaining() > 0;
    }
  }

  /**
   * Relay audio chunk to Minerva.
   */
  private void relayAudioChunk(
    WebSocketClient wsClient,
    String contactId,
    byte[] audioBytes,
    String track,
    long trackNum
  ) throws Exception {
    logger.debug("relayAudioChunk ...");
    AudioChunk audioChunk = new AudioChunk();
    audioChunk.setContactId(contactId);
    audioChunk.setTrackName(track);
    audioChunk.setTrackNum(trackNum);
    audioChunk.setAudio(audioBytes);
    wsClient.SendAudioChunk(audioChunk);
  }

  /**
   * @return AWS credentials to be used to connect to Transcribe service. This example uses the default credentials
   * provider, which looks for environment variables (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY) or a credentials
   * file on the system running this program.
   */
  private static AWSCredentialsProvider getAWSCredentials() {
    return DefaultAWSCredentialsProviderChain.getInstance();
  }
}

with KVSUtils as follows:

public final class KVSUtils
{
    private static final Logger logger = LoggerFactory.getLogger(KVSUtils.class);

    public enum TrackName {
        AUDIO_FROM_CUSTOMER("AUDIO_FROM_CUSTOMER"),
        AUDIO_TO_CUSTOMER("AUDIO_TO_CUSTOMER");

        private String name;

        TrackName(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }
    }

    public static class AudioChunkTrack
    {
        ByteBuffer audio;
        String track;
        long trackNum;

        public AudioChunkTrack(ByteBuffer audio, String track, long trackNum)
        {
            this.audio = audio;
            this.track = track;
            this.trackNum = trackNum;
        }
    }

    /**
     * Fetches the next ByteBuffer of size 1024 bytes from the KVS stream by parsing the frame from the MkvElement
     * Each frame has a ByteBuffer having size 1024
     *
     * @param streamingMkvReader
     * @param fragmentVisitor
     * @param tagProcessor
     * @param contactId
     * @return
     * @throws MkvElementVisitException
     */
    public static AudioChunkTrack getByteBufferFromStream(StreamingMkvReader streamingMkvReader,
                                                     FragmentMetadataVisitor fragmentVisitor,
                                                     KVSContactTagProcessor tagProcessor,
                                                     String contactId) throws MkvElementVisitException {
        while (streamingMkvReader.mightHaveNext()) {
            Optional<MkvElement> mkvElementOptional = streamingMkvReader.nextIfAvailable();
            if (mkvElementOptional.isPresent()) {
                if (tagProcessor.shouldStopProcessing()) {
                    return new AudioChunkTrack(ByteBuffer.allocate(0), null, -1);
                }
                MkvElement mkvElement = mkvElementOptional.get();
                mkvElement.accept(fragmentVisitor);
                if (MkvTypeInfos.SIMPLEBLOCK.equals(mkvElement.getElementMetaData().getTypeInfo())) {
                    MkvDataElement dataElement = (MkvDataElement) mkvElement;
                    @SuppressWarnings("unchecked")
                    Frame frame = ((MkvValue<Frame>) dataElement.getValueCopy()).getVal();
                    ByteBuffer audioBuffer = frame.getFrameData();

                    long trackNumber = frame.getTrackNumber();
                    MkvTrackMetadata metadata = fragmentVisitor.getMkvTrackMetadata(trackNumber);
                    return new AudioChunkTrack(audioBuffer, metadata.getTrackName(), trackNumber);
                }
            }
        }

        return new AudioChunkTrack(ByteBuffer.allocate(0), null, -1);
    }

    private static AudioChunkTrack aggregateChunks(List<AudioChunkTrack> subChunkList, int length, String track)
    {
        // No aggregation if 0 or 1 chunk only.
        if (subChunkList.size() <= 0) return new AudioChunkTrack(ByteBuffer.allocate(0), null, -1);
        if (subChunkList.size() == 1) return subChunkList.get(0);

        // Assume trackNum of 1st chunk is same for all chunks with same track name.
        long trackNum = subChunkList.get(0).trackNum;

        ByteBuffer combinedByteBuffer = ByteBuffer.allocate(length);
        for (AudioChunkTrack sub : subChunkList) {
            combinedByteBuffer.put(sub.audio);
        }
        AudioChunkTrack agg = new AudioChunkTrack(combinedByteBuffer, track, trackNum);
        agg.audio.flip();
        return agg;
    }

    /**
     * Fetches ByteBuffer of provided size from the KVS stream by repeatedly calling {@link KVSUtils#getByteBufferFromStream}
     * and concatenating the ByteBuffers to create a single chunk
     *
     * @param streamingMkvReader
     * @param fragmentVisitor
     * @param tagProcessor
     * @param contactId
     * @param chunkSizeInKB
     * @return
     * @throws MkvElementVisitException
     */
    public static List<AudioChunkTrack> getByteBufferFromStream(StreamingMkvReader streamingMkvReader,
                                                     FragmentMetadataVisitor fragmentVisitor,
                                                     KVSContactTagProcessor tagProcessor,
                                                     String contactId,
                                                     int chunkSizeInKB) throws MkvElementVisitException {

        List<AudioChunkTrack> chunkList = new ArrayList<>();

        for (int i = 0; i < chunkSizeInKB; i++) {
            AudioChunkTrack chunk = KVSUtils.getByteBufferFromStream(streamingMkvReader, fragmentVisitor, tagProcessor, contactId);
            if (chunk.audio.remaining() > 0) {
                chunkList.add(chunk);
            } else {
                break;
            }
        }
        if (chunkList.size() <= 0) return chunkList;

        // Collate sequential chunks by track.
        String lastTrack = null;
        int length = 0, totalLength = 0;
        List<AudioChunkTrack> subChunkList = new ArrayList<>();
        List<AudioChunkTrack> collatedChunks = new ArrayList<>();

        for (AudioChunkTrack act : chunkList) {
            if (lastTrack != null && !lastTrack.equals(act.track)) {
                // Aggregate if there is any change of track.
                collatedChunks.add(aggregateChunks(subChunkList, length, lastTrack));
                subChunkList.clear();
                length = 0;
            }
            subChunkList.add(act);
            length += act.audio.remaining();
            lastTrack = act.track;
            totalLength += length;
        }
        if (subChunkList.size() > 0 && length > 0) {
            // Aggregate any leftover, ungrouped chunks.
            collatedChunks.add(aggregateChunks(subChunkList, length, lastTrack));
        }

        if (totalLength <= 0) {
            return new ArrayList<AudioChunkTrack>();
        }
        return collatedChunks;
    }

    /**
     * Makes a GetMedia call to KVS and retrieves the InputStream corresponding to the given streamName and startFragmentNum
     *
     * @param streamName
     * @param region
     * @param startFragmentNum
     * @param awsCredentialsProvider
     * @return
     */
    public static InputStream getInputStreamFromKVS(String streamName,
                                                    Regions region,
                                                    String startFragmentNum,
                                                    AWSCredentialsProvider awsCredentialsProvider,
                                                    String startSelectorType) {
        Validate.notNull(streamName);
        Validate.notNull(region);
        Validate.notNull(startFragmentNum);
        Validate.notNull(awsCredentialsProvider);

        AmazonKinesisVideo amazonKinesisVideo = (AmazonKinesisVideo) AmazonKinesisVideoClientBuilder.standard().build();

        String endPoint = amazonKinesisVideo.getDataEndpoint(new GetDataEndpointRequest()
                .withAPIName(APIName.GET_MEDIA)
                .withStreamName(streamName)).getDataEndpoint();

        AmazonKinesisVideoMediaClientBuilder amazonKinesisVideoMediaClientBuilder = AmazonKinesisVideoMediaClientBuilder.standard()
                .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endPoint, region.getName()))
                .withCredentials(awsCredentialsProvider);
        AmazonKinesisVideoMedia amazonKinesisVideoMedia = amazonKinesisVideoMediaClientBuilder.build();

        StartSelector startSelector;
        startSelectorType = isNullOrEmpty(startSelectorType) ? "NOW" : startSelectorType;
        switch (startSelectorType) {
            case "FRAGMENT_NUMBER":
                startSelector = new StartSelector()
                        .withStartSelectorType(StartSelectorType.FRAGMENT_NUMBER)
                        .withAfterFragmentNumber(startFragmentNum);
                logger.info("StartSelector set to FRAGMENT_NUMBER: " + startFragmentNum);
                break;
            case "NOW":
            default:
                startSelector = new StartSelector()
                        .withStartSelectorType(StartSelectorType.NOW);
                logger.info("StartSelector set to NOW");
                break;
        }

        GetMediaResult getMediaResult = amazonKinesisVideoMedia.getMedia(new GetMediaRequest()
                .withStreamName(streamName)
                .withStartSelector(startSelector));

        logger.info("GetMedia called on stream {} response {} requestId {}", streamName,
                getMediaResult.getSdkHttpMetadata().getHttpStatusCode(),
                getMediaResult.getSdkResponseMetadata().getRequestId());

        return getMediaResult.getPayload();
    }
}

In KVSUtils.getInputStreamFromKVS, I do not segregate returned chunks by track; additionally, when successive chunks are on the same track, I do collate them for efficiency-purposes. Those changes aside, the logic here is pretty much the same thing that Amazon Connect docs furnished in the article cited at the top.

My thinking here is this: I don't need separate threads for each track. I simply read the next available audio chunk from Kinesis Video Stream, and forward it on to the Websockets interface. The receiving end can segregate by track accordingly.

QUESTION: why does this logic result in a noisy and incomplete audio sample?

1

There are 1 best solutions below

0
Kode Charlie On

The audio signal extracted from the Lambda function (through Kinesis Video Streams) was incomplete because Lambda functions by default are configured for a 15-second lifecycle. In short, my audio signal was always truncated to roughly 15 seconds no matter how lengthy the conversation.

To resolve the 15-second limit, visit the Configuration tab for said Lambda function, and adjust the 15-second limit accordingly. NOTE that Lambda functions have a 15-minute compute cap.

I believe said audio signal was noisy because channel samples are interwoven and the playback harness I used is not programmed to deal with such signal structures.

To sum up, the logic posted in the original question here works.