Using the example from the docs for Amazon Connect, I wrote logic (see below) to push chunks from a live audio stream out to an external Websockets server. Per specs, each chunk is 16-bit PCM mono-channel audio, sampled at 8kHz. In the server that receives said audio chunks, I save each chunk to a separate file; said files are named with a unique timestamp. Using sox, I convert each raw audio file to a WAV file:
$ sox -r 8k -c 2 -e signed -b16 -t raw chunk-1692154346612.raw chunk-1692154346612.wav
Again using sox, I concatenate all WAV files into a single file:
$ sox *.wav all-chunks.wav
Shell-expansion here (ie, *.wav) effectively sorts the WAV-file list chronologically.
Then I load all-chunks.wav into Audacity and play the audio. Overall, the entire audio is fairly noisy or choppy, and even missing much of the call conversation.
Following logic implements a handler deployed as a Lambda function:
public class ProcessAudioOverKVS implements RequestHandler<TranscriptionRequest, String>
{
private static final Logger logger = LoggerFactory.getLogger(ProcessAudioOverKVS.class);
private static final Regions REGION = Regions.fromName(System.getenv("APP_REGION"));
private static final String START_SELECTOR_TYPE = System.getenv("START_SELECTOR_TYPE");
private static final int CHUNK_SIZE_IN_KB = 4;
private static final String RELAY_AUDIO_CHUNK_URL = System.getenv("RELAY_AUDIO_CHUNK_URL");
/**
* Handler function for the Lambda. TranscriptionRequest has these properties:
* - streamARN: unique ARN that identifies live audio stream over Kinesis Video Streams.
* - startFragmentNum: integer denoting start of audio content.
* - connectContactId: unique ID that identifies the caller.
* - transcriptionEnabled: boolean, always false.
* - languageCode: set to en-US.
* - saveCallRecording: boolean, always set to false.
* - streamAudioFromCustomer: boolean, always true.
* - streamAudioToCustomer: boolean, always true,
*
* @param request
* @param context
* @return
*/
@Override
public String handleRequest(TranscriptionRequest request, Context context)
{
logger.info("received request : " + request.toString());
logger.info("received context: " + context.toString());
boolean success = false;
WebSocketClient wsClient = new WebSocketClient();
try {
request.validate();
wsClient.Connect(RELAY_AUDIO_CHUNK_URL);
pushAudioChunks(wsClient, request);
success = true;
} catch (Exception e) {
logger.error("ProcessAudioOverKVS failed", e);
} finally {
try { wsClient.Disconnect(); } catch (Exception e) {}
}
return success ? "{ \"result\": \"Success\" }" : "{ \"result\": \"Failed\" }";
}
private void pushAudioChunks(WebSocketClient wsClient, TranscriptionRequest request) throws Exception
{
logger.debug("pushAudioChunks ...");
String streamARN = request.getStreamARN();
String streamName = streamARN.substring(streamARN.indexOf("/") + 1, streamARN.lastIndexOf("/"));
String contactId = request.getConnectContactId();
String startFragmentNum = request.getStartFragmentNum();
InputStream kvsInputStream = KVSUtils.getInputStreamFromKVS(
streamName, REGION, startFragmentNum, getAWSCredentials(), START_SELECTOR_TYPE
);
StreamingMkvReader streamingMkvReader = StreamingMkvReader.createDefault(new InputStreamParserByteSource(kvsInputStream));
KVSContactTagProcessor tagProcessor = new KVSContactTagProcessor(contactId);
FragmentMetadataVisitor fragmentVisitor = FragmentMetadataVisitor.create(Optional.of(tagProcessor));
List<KVSUtils.AudioChunkTrack> chunkList = KVSUtils.getByteBufferFromStream(
streamingMkvReader, fragmentVisitor, tagProcessor, contactId, CHUNK_SIZE_IN_KB
);
boolean haveAudio = chunkList.size() > 0 && chunkList.get(0).audio.remaining() > 0;
while (haveAudio) {
for (KVSUtils.AudioChunkTrack act : chunkList) {
if (act.audio.remaining() > 0) {
byte[] audioBytes = new byte[act.audio.remaining()];
act.audio.get(audioBytes);
try {
relayAudioChunk(wsClient, contactId, audioBytes, act.track, act.trackNum);
} catch (Exception ex) {
logger.error(
"could not relay audio chunk for contactId={}, track={}, trackNum={}",
contactId, act.track, act.trackNum, ex
);
}
}
}
chunkList = KVSUtils.getByteBufferFromStream(
streamingMkvReader, fragmentVisitor, tagProcessor, contactId, CHUNK_SIZE_IN_KB
);
haveAudio = chunkList.size() > 0 && chunkList.get(0).audio.remaining() > 0;
}
}
/**
* Relay audio chunk to Minerva.
*/
private void relayAudioChunk(
WebSocketClient wsClient,
String contactId,
byte[] audioBytes,
String track,
long trackNum
) throws Exception {
logger.debug("relayAudioChunk ...");
AudioChunk audioChunk = new AudioChunk();
audioChunk.setContactId(contactId);
audioChunk.setTrackName(track);
audioChunk.setTrackNum(trackNum);
audioChunk.setAudio(audioBytes);
wsClient.SendAudioChunk(audioChunk);
}
/**
* @return AWS credentials to be used to connect to Transcribe service. This example uses the default credentials
* provider, which looks for environment variables (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY) or a credentials
* file on the system running this program.
*/
private static AWSCredentialsProvider getAWSCredentials() {
return DefaultAWSCredentialsProviderChain.getInstance();
}
}
with KVSUtils as follows:
public final class KVSUtils
{
private static final Logger logger = LoggerFactory.getLogger(KVSUtils.class);
public enum TrackName {
AUDIO_FROM_CUSTOMER("AUDIO_FROM_CUSTOMER"),
AUDIO_TO_CUSTOMER("AUDIO_TO_CUSTOMER");
private String name;
TrackName(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
public static class AudioChunkTrack
{
ByteBuffer audio;
String track;
long trackNum;
public AudioChunkTrack(ByteBuffer audio, String track, long trackNum)
{
this.audio = audio;
this.track = track;
this.trackNum = trackNum;
}
}
/**
* Fetches the next ByteBuffer of size 1024 bytes from the KVS stream by parsing the frame from the MkvElement
* Each frame has a ByteBuffer having size 1024
*
* @param streamingMkvReader
* @param fragmentVisitor
* @param tagProcessor
* @param contactId
* @return
* @throws MkvElementVisitException
*/
public static AudioChunkTrack getByteBufferFromStream(StreamingMkvReader streamingMkvReader,
FragmentMetadataVisitor fragmentVisitor,
KVSContactTagProcessor tagProcessor,
String contactId) throws MkvElementVisitException {
while (streamingMkvReader.mightHaveNext()) {
Optional<MkvElement> mkvElementOptional = streamingMkvReader.nextIfAvailable();
if (mkvElementOptional.isPresent()) {
if (tagProcessor.shouldStopProcessing()) {
return new AudioChunkTrack(ByteBuffer.allocate(0), null, -1);
}
MkvElement mkvElement = mkvElementOptional.get();
mkvElement.accept(fragmentVisitor);
if (MkvTypeInfos.SIMPLEBLOCK.equals(mkvElement.getElementMetaData().getTypeInfo())) {
MkvDataElement dataElement = (MkvDataElement) mkvElement;
@SuppressWarnings("unchecked")
Frame frame = ((MkvValue<Frame>) dataElement.getValueCopy()).getVal();
ByteBuffer audioBuffer = frame.getFrameData();
long trackNumber = frame.getTrackNumber();
MkvTrackMetadata metadata = fragmentVisitor.getMkvTrackMetadata(trackNumber);
return new AudioChunkTrack(audioBuffer, metadata.getTrackName(), trackNumber);
}
}
}
return new AudioChunkTrack(ByteBuffer.allocate(0), null, -1);
}
private static AudioChunkTrack aggregateChunks(List<AudioChunkTrack> subChunkList, int length, String track)
{
// No aggregation if 0 or 1 chunk only.
if (subChunkList.size() <= 0) return new AudioChunkTrack(ByteBuffer.allocate(0), null, -1);
if (subChunkList.size() == 1) return subChunkList.get(0);
// Assume trackNum of 1st chunk is same for all chunks with same track name.
long trackNum = subChunkList.get(0).trackNum;
ByteBuffer combinedByteBuffer = ByteBuffer.allocate(length);
for (AudioChunkTrack sub : subChunkList) {
combinedByteBuffer.put(sub.audio);
}
AudioChunkTrack agg = new AudioChunkTrack(combinedByteBuffer, track, trackNum);
agg.audio.flip();
return agg;
}
/**
* Fetches ByteBuffer of provided size from the KVS stream by repeatedly calling {@link KVSUtils#getByteBufferFromStream}
* and concatenating the ByteBuffers to create a single chunk
*
* @param streamingMkvReader
* @param fragmentVisitor
* @param tagProcessor
* @param contactId
* @param chunkSizeInKB
* @return
* @throws MkvElementVisitException
*/
public static List<AudioChunkTrack> getByteBufferFromStream(StreamingMkvReader streamingMkvReader,
FragmentMetadataVisitor fragmentVisitor,
KVSContactTagProcessor tagProcessor,
String contactId,
int chunkSizeInKB) throws MkvElementVisitException {
List<AudioChunkTrack> chunkList = new ArrayList<>();
for (int i = 0; i < chunkSizeInKB; i++) {
AudioChunkTrack chunk = KVSUtils.getByteBufferFromStream(streamingMkvReader, fragmentVisitor, tagProcessor, contactId);
if (chunk.audio.remaining() > 0) {
chunkList.add(chunk);
} else {
break;
}
}
if (chunkList.size() <= 0) return chunkList;
// Collate sequential chunks by track.
String lastTrack = null;
int length = 0, totalLength = 0;
List<AudioChunkTrack> subChunkList = new ArrayList<>();
List<AudioChunkTrack> collatedChunks = new ArrayList<>();
for (AudioChunkTrack act : chunkList) {
if (lastTrack != null && !lastTrack.equals(act.track)) {
// Aggregate if there is any change of track.
collatedChunks.add(aggregateChunks(subChunkList, length, lastTrack));
subChunkList.clear();
length = 0;
}
subChunkList.add(act);
length += act.audio.remaining();
lastTrack = act.track;
totalLength += length;
}
if (subChunkList.size() > 0 && length > 0) {
// Aggregate any leftover, ungrouped chunks.
collatedChunks.add(aggregateChunks(subChunkList, length, lastTrack));
}
if (totalLength <= 0) {
return new ArrayList<AudioChunkTrack>();
}
return collatedChunks;
}
/**
* Makes a GetMedia call to KVS and retrieves the InputStream corresponding to the given streamName and startFragmentNum
*
* @param streamName
* @param region
* @param startFragmentNum
* @param awsCredentialsProvider
* @return
*/
public static InputStream getInputStreamFromKVS(String streamName,
Regions region,
String startFragmentNum,
AWSCredentialsProvider awsCredentialsProvider,
String startSelectorType) {
Validate.notNull(streamName);
Validate.notNull(region);
Validate.notNull(startFragmentNum);
Validate.notNull(awsCredentialsProvider);
AmazonKinesisVideo amazonKinesisVideo = (AmazonKinesisVideo) AmazonKinesisVideoClientBuilder.standard().build();
String endPoint = amazonKinesisVideo.getDataEndpoint(new GetDataEndpointRequest()
.withAPIName(APIName.GET_MEDIA)
.withStreamName(streamName)).getDataEndpoint();
AmazonKinesisVideoMediaClientBuilder amazonKinesisVideoMediaClientBuilder = AmazonKinesisVideoMediaClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endPoint, region.getName()))
.withCredentials(awsCredentialsProvider);
AmazonKinesisVideoMedia amazonKinesisVideoMedia = amazonKinesisVideoMediaClientBuilder.build();
StartSelector startSelector;
startSelectorType = isNullOrEmpty(startSelectorType) ? "NOW" : startSelectorType;
switch (startSelectorType) {
case "FRAGMENT_NUMBER":
startSelector = new StartSelector()
.withStartSelectorType(StartSelectorType.FRAGMENT_NUMBER)
.withAfterFragmentNumber(startFragmentNum);
logger.info("StartSelector set to FRAGMENT_NUMBER: " + startFragmentNum);
break;
case "NOW":
default:
startSelector = new StartSelector()
.withStartSelectorType(StartSelectorType.NOW);
logger.info("StartSelector set to NOW");
break;
}
GetMediaResult getMediaResult = amazonKinesisVideoMedia.getMedia(new GetMediaRequest()
.withStreamName(streamName)
.withStartSelector(startSelector));
logger.info("GetMedia called on stream {} response {} requestId {}", streamName,
getMediaResult.getSdkHttpMetadata().getHttpStatusCode(),
getMediaResult.getSdkResponseMetadata().getRequestId());
return getMediaResult.getPayload();
}
}
In KVSUtils.getInputStreamFromKVS, I do not segregate returned chunks by track; additionally, when successive chunks are on the same track, I do collate them for efficiency-purposes. Those changes aside, the logic here is pretty much the same thing that Amazon Connect docs furnished in the article cited at the top.
My thinking here is this: I don't need separate threads for each track. I simply read the next available audio chunk from Kinesis Video Stream, and forward it on to the Websockets interface. The receiving end can segregate by track accordingly.
QUESTION: why does this logic result in a noisy and incomplete audio sample?
The audio signal extracted from the Lambda function (through Kinesis Video Streams) was incomplete because Lambda functions by default are configured for a 15-second lifecycle. In short, my audio signal was always truncated to roughly 15 seconds no matter how lengthy the conversation.
To resolve the 15-second limit, visit the
Configurationtab for said Lambda function, and adjust the 15-second limit accordingly. NOTE that Lambda functions have a 15-minute compute cap.I believe said audio signal was noisy because channel samples are interwoven and the playback harness I used is not programmed to deal with such signal structures.
To sum up, the logic posted in the original question here works.