Convert function using callbacks into Async Iterator variant

1k Views Asked by At

Scenario

I'm given a function with an asynchronous callback like

let readFile:   (path: string, callback: (line: string, eof: boolean) => void) => void

Though I would prefer a function using AsyncIterable/AsyncGenerator signature instead:

let readFileV2: (path: string) => AsyncIterable<string>

Problem

Without readFileV2, I have to read a file like

let file = await new Promise((res, err) => {
    let file = ''
    readFile('./myfile.txt', (line, eof) => {
        if (eof) { return res(file) }
        file += line + '\n'
    })
})

.. while readFileV2 allows me to do it cleaner like

let file = '';
for await (let line of readFileV2('./myfile.txt')) {
    file += line + '\n'
}

Question

Is there a way for me to transform readFile into readFileV2?

Updated for clarification:

Is there a general approach to transform a function with an async callback argument to an AsyncGenerator/AsyncIterable variant?

And can this approach be demonstrated on the readFile function above?

References

I see two related questions here:

However, they don't seem to provide a clear answer.

4

There are 4 best solutions below

3
jcalz On BEST ANSWER

Disclaimer at the outset: I am answering the following question:

Given a data providing function fn of a form like (...args: A, callback: (data: T, done: boolean) => void) => void for some list of initial argument types A and data type T, how can we transform this function transform(fn) to produce a new function of the form (...args: A) => AsyncIterable<T>?

It is quite possible that this isn't the right thing to be doing in general, since consumers of AsyncIterable<T> may process data slowly or abort early, and a function of type (...args: [...A, (data: T, done: boolean) => void]) => void can't possibly react to that; it will call callback once per piece of data, whenever it wants, and it will not stop until it feels like it.


Still, here is one possible implementation:

const transform = <A extends any[], T>(
    fn: (...args: [...args: A, callback: (val: T, done: boolean) => void]) => void
) => (...args: A): AsyncIterable<T> => {
    let values: Promise<[T, boolean]>[] = [];
    let resolve: (x: [T, boolean]) => void;
    values.push(new Promise(r => { resolve = r; }));
    fn(...args, (val: T, done: boolean) => {
        resolve([val, done]);
        values.push(new Promise(r => { resolve = r; }));
    });
    return async function* () {
        let val: T;
        for (let i = 0, done = false; !done; i++) {
            [val, done] = await values[i];
            delete values[i];
            yield val;
        }
    }();
}

Essentially we provide a queue of data values, values, which gets written to inside the callback passed to fn, and which gets read from inside a generator function. This is accomplished by a chain of promises; the first promise is created manually, and each time data is available, it resolves the current promise and pushes new values with a new promise onto the queue. The generator function awaits these promises, pulls data off the queue, and removes the consumed data.


To test it, someone needs to provide fn. Here's one possibility:

function sleep(ms: number) {
    return new Promise<void>(r => setTimeout(r, ms));
}

const provideData = async (name: string, callback: (line: string, eof: boolean) => void) => {
    const contents = [
        "This is line 1 of " + name, "and this is line 2",
        "and line 3", "and 4", "5",
        "and that's the end of " + name + "."
    ];
    for (const [line, eof] of contents.map((l, i, a) => [l, i >= a.length - 1] as const)) {
        await sleep(1000); // I guess it takes a second to read each line
        callback(line, eof);
    }
}

The provideData function accepts a callback and calls it once per second with successive lines of an array. And now we transform it:

const provideDataV2 = transform(provideData);
// let provideDataV2: (name: string) => AsyncIterable<string>

And let's test the transformer:

async function foo() {
    console.log(new Date().toLocaleTimeString(), "starting")
    const iter = provideDataV2("my data");
    await sleep(2500); // not ready to read yet, I guess    
    for await (let line of iter) {
        console.log(new Date().toLocaleTimeString(), line)
    }
    console.log(new Date().toLocaleTimeString(), "done")
}
foo()

/* 
[LOG]: "2:48:36 PM",  "starting" 
[LOG]: "2:48:37 PM",  "This is line 1 of my data" 
[LOG]: "2:48:38 PM",  "and this is line 2" 
[LOG]: "2:48:39 PM",  "and line 3" 
[LOG]: "2:48:40 PM",  "and 4" 
[LOG]: "2:48:41 PM",  "5" 
[LOG]: "2:48:42 PM",  "and that's the end of my data." 
[LOG]: "2:48:42 PM",  "done" 
*/

Looks good.

Is it perfect? Does it have weird side effects in response to weird situations (e.g., are you going to iterate it multiple times)? Should it handle errors in a particular way? Are there recommended solutions elsewhere? Not sure. This is just a possible implementation of transform that adheres to the contract laid out in the question as asked.

Playground link to code

7
vitaly-t On

This has been a NodeJS-native API since v10, no need reinventing it:

const {createReadStream} = require('fs');
const {createInterface} = require('readline');

function readFileLines(fileName: string): AsyncIterable<string> {
    const input = createReadStream(fileName);
    return createInterface({input, crlfDelay: Infinity});
}

Testing it:

const lines = readFileLines('./test1.js');
for await(const l of lines) {
    console.log(l);
}
3
Redu On

Yes.

I did this for Deno.serve which is an HTTP server that takes a callback and an options object like Deno.serve(req => respondWith(req), {port: 3000}).

Basically the code is;

async function* emitterGen(opts){
  let _resolve,
      _req = new Promise((resolve,reject) => _resolve = resolve);
  Deno.serve( req => ( _resolve(req)
                     , _req = new Promise((resolve,reject) => _resolve = resolve)
                     )
            , opts
            );
  while (true){
    yield await _req;
  }
}

let reqEmitter = emitterGen({port: 3000});

for await (let req of reqEmitter){
  respondWith(req);
}

Obviously the code above is simplified, without exception handling. Yet it should be sufficient to answer your question.

Here is a working mock up sever which creates a random number (0-99) as request (req) at every random (0-999) ms and invokes cb (handler) with req. Stops after 5 iterations.

function server(cb,ms){
  let count  = 5,
      looper = function(c = count,t = ms){
                 let stoid = setTimeout( req => ( cb(req)
                                                , --c && looper(c, Math.random()*1000 >>> 0)
                                                , clearTimeout(stoid)
                                                )
                                       , t
                                       , Math.random()*100 >>> 0
                                       )
               }
  looper();
}

async function* emitterGen(ms){
  let _resolve,
      _req = new Promise((resolve,reject) => _resolve = resolve);
  server( req => ( _resolve(req)
                 , _req = new Promise((resolve,reject) => _resolve = resolve)
                 )
        , ms
        );
  while (true){
    yield await _req;
  }
}

let reqEmitter = emitterGen(1000);

// since there is no top level await in SO snippets
(async function(){
  for await (let req of reqEmitter){
    console.log(`Received request is: ${req}`);
  }
})();

0
Alex On

I created a class that can produce an async generator from any source:

/** Infinite async generator. Iterates messages pushed to it until closed. */
class Machina<T> {

  #open = true;
  #queue: T[] = [];
  #resolve: (() => void) | undefined;

  async * stream(): AsyncGenerator<T> {
    this.#open = true;

    while (this.#open) {
      if (this.#queue.length) {
        yield this.#queue.shift()!;
        continue;
      }

      await new Promise<void>((_resolve) => {
        this.#resolve = _resolve;
      });
    }
  }

  push(data: T): void {
    this.#queue.push(data);
    this.#resolve?.();
  }

  close(): void {
    this.#open = false;
    this.#resolve?.();
  }

}

export { Machina };

You can use it like this:

// Create the Machina instance
const machina = new Machina<string>();

// Async generator loop
async function getMessages() {
  for await (const msg of machina.stream()) {
    console.log(msg);
  }
}

// Start the generator
getMessages();

// Push messages to it
machina.push('hello!');
machina.push('whats up?');
machina.push('greetings');

// Stop the generator
machina.close();

For your specific case, something like this should work:

/** Read each line of the file as an AsyncGenerator. */
function readFileAsync(path: string): AsyncGenerator<string> {
  const machina = new Machina<string>();

  readFile(path, (line: string, eof: boolean) => {
    if (eof) {
      machina.close();
    } else {
      machina.push(line);
    }
  });

  return machina.stream();
}

// Usage
for await (const line of readFileAsync('file.txt')) {
  console.log(line);
}

How it works

  1. Calling machina.stream() kicks off an infinite loop, but it becomes paused immediately (on the first iteration) because it's waiting for a promise that isn't resolved.
  2. Calling machina.push() adds an item to the buffer, and then unpauses it by resolving the promise. When it becomes unpaused, it empties the buffer into the stream and then pauses it again by awaiting a new promise.
  3. The consumer of machina.stream() receives the pushed items. You can do this repeatedly.

Other considerations:

  • Even if the Machina instance goes out of scope (ie ready to be garbage-collected), its promise will still run forever. So you need to manually call machina.close() when you are done streaming, if ever. Simply breaking out of the loop is not enough!
  • I tried a slightly different design at first, without using a buffer. It turns out the buffer is needed, otherwise you cannot push multiple items in the same tick (all after the first will be dropped). But if you're only pushing one item per tick and are actively consuming the stream, the buffer will only ever contain one item. This is basically just a small memory consideration.