# Recover from Connection Errors and Timeouts in Rev AI Streaming Transcription Sessions

**By Kyle Bridburg, Engineering Manager and Vikram Vaswani, Developer Advocate - May 09, 2022**

## Introduction

Rev AI's [Streaming Speech-to-Text API](/api/streaming) enables real-time transcription for streaming audio. It works with all major English accents and includes key features such as punctuation, capitalization, speaker diarization, custom vocabulary and profanity filtering.

The Streaming Speech-to-Text API can be used with both WebSocket and RTMP streams, with a [time limit of 3 hours per stream](/api/streaming#api-limits). While this is more than sufficient for most scenarios, there are cases where live streams can run longer than 3 hours - for example, live transcription of commentary for a day-long sporting event.

With Rev AI, the recommended practice when a stream approaches the 3-hour limit is to initialize a new concurrent WebSocket connection and switch to it. This sounds simple but in practice, application developers often struggle with implementing solutions that handle connection disruption correctly (whether due to session length timeouts or other network connectivity interruptions).

This tutorial proposes some solutions for the above challenge, with a view to helping developers implement better real-time transcription solutions for long-running audio streams.

## Assumptions

This tutorial assumes that:

- You have a Rev AI account and access token. If not, [sign up for a free account](https://www.rev.ai/auth/signup) and [generate an access token](/get-started#step-1-get-your-access-token).
- You have some familiarity with Rev AI's [Streaming Speech-to-Text API](/api/streaming). If not, [familiarize yourself with the basics](/api/streaming/get-started) and learn about making [WebSocket requests](/api/streaming/requests) and [receiving API responses](/api/streaming/responses).
- You have a properly-configured Node.js development environment with Node.js v16.x or v17.x. If not, [download and install Node.js](https://nodejs.org/en/download/) for your operating system.
- You have installed the [Rev AI Node SDK](/sdk/node).
- You have an audio file in RAW format.


## Key challenges

When integrating Rev AI live transcription with long-running audio streams, developers have to be cognizant of the following issues:

### Connection disruption

Rev AI's Streaming Speech-to-Text API sets a time limit per stream of 3 hours. When a stream's 3-hour limit is reached, the streaming connection will be terminated by the API. Apart from this, the streaming connection may also be disrupted due to external factors, such as network failures or bandwidth limitations.

In both these cases, the application will need to initialize a new WebSocket connection and start a new streaming session. Once the new WebSocket connection is accepted and the `connected` message is received, the application can begin streaming audio to it.

### Data loss

When reconnecting to the API for any of the reasons described above, there is invariably a period of time when audio data is produced, but not transferred to the API for transcription. It is important for the application developer to have a strategy in place to avoid losing this audio data during the connection recovery process.

In this case, the application will need to store the audio data in a buffer until such time as the connection to the API is re-established and the data can be sent for (delayed) transcription.

### Timestamp corruption

Rev AI's transcripts include timestamps for every transcribed word. Timestamps correspond to when the words are spoken within the audio. Every (re)connection to the API is treated as a new connection, and audio is therefore timestamped starting from `00:00:00`. However, re-aligning the timestamps correctly to the audio stream is a critical task when restarting an interrupted streaming session.

In this case, the application will need to provide a starting timestamp to offset all hypotheses timings by adding `start_ts` as a query parameter to the connection request. This will ensure that output hypotheses have their timestamps offset by the number of seconds provided in the `start_ts` parameter.

## Technical approach

The following example can be used to configure a streaming client to transcribe a long-duration stream using a RAW-format audio file. It handles reconnects (whether due to session length timeouts or other connectivity interruption) without losing audio. It also re-aligns timestamp offsets to the new streaming session when reconnecting.

To use this example, replace the `<FILEPATH>` placeholder with the path to the audio file (RAW format) you wish to stream and the `<REVAI_ACCESS_TOKEN>` placeholder with your Rev AI account's access token.


```javascript
const fs = require('fs');
const revai = require('revai-node-sdk');
const { Writable } = require('stream');

const token = '<REVAI_ACCESS_TOKEN>';
const filePath = '<FILEPATH>';
const bytesPerSample = 2;
const samplesPerSecond = 16000;
const chunkSize = 8000;

// initialize client with audio configuration and access token
const audioConfig = new revai.AudioConfig(
    /* contentType */ 'audio/x-raw',
    /* layout */      'interleaved',
    /* sample rate */ samplesPerSecond,
    /* format */      'S16LE',
    /* channels */    1
);

// optional config to be provided.
const sessionConfig = new revai.SessionConfig(
    metadata='example metadata', /* (optional) metadata */
    customVocabularyID=null,  /* (optional) custom_vocabulary_id */
    filterProfanity=false,    /* (optional) filter_profanity */
    removeDisfluencies=false, /* (optional) remove_disfluencies */
    deleteAfterSeconds=0,     /* (optional) delete_after_seconds */
    startTs=0,                /* (optional) start_ts */
    transcriber='machine',    /* (optional) transcriber */
    detailedPartials=false,   /* (optional) detailed_partials */
    language="en"             /* (optional) language */
);

// begin streaming session
let client = null;
let revaiStream = null;

let audioBackup = [];
let audioBackupCopy = [];
let newStream = true;
let lastResultEndTsReceived = 0.0;

function handleData(data) {
    switch (data.type){
        case 'connected':
            console.log("Received connected");
            break;
        case 'partial':
            console.log(`Partial: ${data.elements.map(x => x.value).join(' ')}`);
            break;
        case 'final':
            console.log(`Final: ${data.elements.map(x => x.value).join('')}`);
            const textElements = data.elements.filter(x => x.type === "text");
            lastResultEndTsReceived = textElements[textElements.length - 1].end_ts;
            console.log(lastResultEndTsReceived * samplesPerSecond * bytesPerSample / 1024);
            break;
        default:
            // all messages from the API are expected to be one of the previous types
            console.error('Received unexpected message');
            break;
    }
}

function startStream() {
    client = new revai.RevAiStreamingClient(token, audioConfig);

    // create event responses
    client.on('close', (code, reason) => {
        console.log(`Connection closed, ${code}: ${reason}`);
        if (code !== 1000 || reason == 'Reached max session lifetime'){
            console.log('Restarting stream');
            restartStream();
        }
        console.log(bytesWritten);
    });
    client.on('httpResponse', code => {
        console.log(`Streaming client received HTTP response with code: ${code}`);
    });
    client.on('connectFailed', error => {
        console.log(`Connection failed with error: ${error}`);
    });
    client.on('connect', connectionMessage => {
        console.log(`Connected with job ID: ${connectionMessage.id}`);
    });

    audioBackup = [];
    sessionConfig.startTs = lastResultEndTsReceived;

    revaiStream = client.start(sessionConfig);
    revaiStream.on('data', data => {
        handleData(data);
    });
    revaiStream.on('end', function () {
        console.log('End of stream');
    });
}

let bytesWritten = 0;

const audioInputStreamTransform = new Writable({
    write(chunk, encoding, next) {
        if (newStream && audioBackupCopy.length !== 0) {
            // approximate math to calculate time of chunks
            const bitsSent = lastResultEndTsReceived * samplesPerSecond * bytesPerSample;
            const chunksSent = Math.floor(bitsSent / chunkSize);
            if (chunksSent !== 0) {
                for (let i = chunksSent; i < audioBackupCopy.length; i++) {
                    revaiStream.write(audioBackupCopy[i][0], audioBackupCopy[i][1]);
                }
            }
            newStream = false;
        }

        audioBackup.push([chunk, encoding]);

        if (revaiStream) {
            revaiStream.write(chunk, encoding);
            bytesWritten += chunk.length;
        }

        next();
    },

    final() {
        if (client && revaiStream) {
            client.end();
            revaiStream.end();
        }
    }
});

function restartStream() {
    if (revaiStream) {
        client.end();
        revaiStream.end();
        revaiStream.removeListener('data', handleData);
        revaiStream = null;
    }

    audioBackupCopy = [];
    audioBackupCopy = audioBackup;

    newStream = true;

    startStream();
}

// read file from disk
let file = fs.createReadStream(filePath);

startStream();

file.on('end', () => {
    chunkInputTransform.end();
})

// array for data left over from chunking writes into chunks of 8000
let leftOverData = null;

const chunkInputTransform = new Writable({
    write(chunk, encoding, next) {
        if (encoding !== 'buffer'){
            console.log(`${encoding} is not buffer, writing directly`);
            audioInputStreamTransform.write(chunk, encoding);
        }
        else {
            let position = 0;

            if (leftOverData != null) {
                let audioChunk = Buffer.alloc(chunkSize);
                const copiedAmount = leftOverData.length;
                console.log(`${copiedAmount} left over, writing with next chunk`);
                leftOverData.copy(audioChunk);
                leftOverData = null;
                chunk.copy(audioChunk, chunkSize - copiedAmount);
                position += chunkSize - copiedAmount;
                audioInputStreamTransform.write(audioChunk, encoding);
            }

            while(chunk.length - position > chunkSize) {
                console.log(`${chunk.length - position} bytes left in chunk, writing with next audioChunk`);
                let audioChunk = Buffer.alloc(chunkSize);
                chunk.copy(audioChunk, 0, position, position+chunkSize);
                position += chunkSize;
                audioInputStreamTransform.write(audioChunk, encoding);
            }

            if (chunk.length > 0) {
                leftOverData = Buffer.alloc(chunk.length - position);
                chunk.copy(leftOverData, 0, position);
            }
        }

        next();
    },

    final() {
        if (leftOverData != null) {
            audioInputStreamTransform.write(leftOverData);
            audioInputStreamTransform.end();
        }
    }
})

// stream the file
file.pipe(chunkInputTransform);
```

This code sample is illustrative and not intended for production use.

The following sections explain this code listing with reference to the specific problems described earlier.

### Connection disruption

Refer to the following code segments:


```javascript
function startStream() {
    client = new revai.RevAiStreamingClient(token, audioConfig);

    client.on('close', (code, reason) => {
        console.log(`Connection closed, ${code}: ${reason}`);
        if (code !== 1000 || reason == 'Reached max session lifetime'){
            console.log('Restarting stream');
            restartStream();
        }
    });

    // ...

    revaiStream = client.start(sessionConfig);

    // ...
}

function restartStream() {
    if (revaiStream) {
        client.end();
        revaiStream.end();
        revaiStream.removeListener('data', handleData);
        revaiStream = null;
    }

    // ...

    newStream = true;

    startStream();
}
```

The `startStream()` function creates a new Rev AI streaming client and initializes a streaming session as `revAiStream`. It also defines an event handler for a WebSocket `close` event, which could be generated either due to a connectivity failure or due to a stream timeout. This event handler invokes the `restartStream()` method, which checks if the `revaiStream` session was correctly terminated and, if not, restarts it.

### Data loss

Refer to the following code segments:


```javascript
let audioBackup = [];
let audioBackupCopy = [];

const audioInputStreamTransform = new Writable({
    write(chunk, encoding, next) {
        if (newStream && audioBackupCopy.length !== 0) {

            // ...

            if (chunksSent !== 0) {
                for (let i = chunksSent; i < audioBackupCopy.length; i++) {
                    revaiStream.write(audioBackupCopy[i][0], audioBackupCopy[i][1]);
                }
            }
            newStream = false;
        }

        audioBackup.push([chunk, encoding]);

        // ...
    },

    // ...
});

function restartStream() {

    // ...

    audioBackupCopy = [];
    audioBackupCopy = audioBackup;

    newStream = true;

    startStream();
}
```

Here, `audioBackup` acts as a data store backup for the streamed audio. If a streaming session ends unexpectedly, two things are needed to restart and continue without data loss:

- The backup of the audio to resend from, to ensure no data is lost
- A new backup for the restarted stream


When a stream is restarted with the `restartStream()` function, the contents of `audioBackup` is copied into `audioBackupCopy` and then cleared in readiness for the new backup. Data is then sent to the `revAiStream` streaming session from `audioBackupCopy`.

### Timestamp corruption

Refer to the following code segments:


```javascript
let lastResultEndTsReceived = 0.0;

function startStream() {
    client = new revai.RevAiStreamingClient(token, audioConfig);

    // ...

    sessionConfig.startTs = lastResultEndTsReceived;
    revaiStream = client.start(sessionConfig);
    revaiStream.on('data', data => {
        handleData(data);
    });

    // ...
}

function handleData(data) {
    switch (data.type){

        // ...

        case 'final':
            const textElements = data.elements.filter(x => x.type === "text");
            lastResultEndTsReceived = textElements[textElements.length - 1].end_ts;
            break;

        // ...
    }
}
```

Here, the `lastResultEndTsReceived` variable holds the timestamp received, updated continuously with each final hypotheses. When the streaming session restarts, the `start_ts` parameter is set to the value of `lastResultEndTsReceived`, to re-align timestamps to the stream audio.

One important point to note here is that this could potentially result in some audio getting resent to the API. Since only final hypotheses have timestamps, all audio since the last final hypothesis will be resent which could lead to some small number of words being duplicated.

## Next steps

Transcribing live audio comes with numerous challenges around connection recovery, data protection and timestamp alignment. For developers working with Rev AI's Streaming Speech-to-Text API, this tutorial provided a technical approach and sample implementation to resolve these challenges.

Learn more about the topics discussed in this tutorial by visiting the following links:

- Documentation: Streaming Speech-to-Text API [overview](/api/streaming) and [code samples](/api/streaming/code-samples)
- Documentation: Streaming Speech-to-Text [example session](/api/streaming/example-session)
- Documentation: [Node SDK](/sdk/node)
- Tutorial: [Best Practices for the Rev AI APIs](/resources/tutorials/api-best-practices)