Perl Mojolicious: limiting concurrency with async/await and all_settled

191 Views Asked by At

In Mojolicious full app, I have the following pattern.

  1. Run some legacy requests concurrently (e.g. insert into db1 & db2 at the same time); and
  2. Run some legacy requests sequentially (e.g. for each db, insert table1 first, then table2 afterwards, etc).

Note: for the example below I used httpbin.org Docker image:

docker run --rm --name httpbin -p 8000:80 kennethreitz/httpbin

How can I use Mojo::Promise->map to limit concurrency for this pattern? Can concurrency apply to all my actions in db1 and db2? In my example below is it possible to (say) limit the number of hits to http://localhost:8000 to only 3 at any given moment?

use Mojolicious::Lite -signatures, -async_await;
use Mojo::Util qw(dumper);

helper db1_LocationA_p => async sub ($self, $request)
{
    return Mojo::Promise->new(sub($resolve, $reject ) {
        Mojo::IOLoop->subprocess(
            sub {
                my $tx = $self->ua->post('http://localhost:8000/delay/2' => json => $request);
                my $res = $tx->result;
                die $res->message if $res->is_error;
                $res->json;
            },
            sub {
                my ( $subprocess, $err, @res ) = @_;
                $reject->( $err ) if $err;
                $resolve->( @res );
            }
            );
                              });
};

helper db1_LocationB_p => async sub ($self, $request) {
    return $self->db1_LocationA_p("LocationB $request"); # For brevity
};

helper db2_LocationA_p => async sub ($self, $request)
{
    return Mojo::Promise->new(sub($resolve, $reject ) {
        Mojo::IOLoop->subprocess(
            sub {
                my $tx = $self->ua->post('http://localhost:8000/delay/5' => json => $request);
                my $res = $tx->result;
                die $res->message if $res->is_error;
                $res->json;
            },
            sub {
                my ( $subprocess, $err, @res ) = @_;
                $reject->( $err ) if $err;
                $resolve->( @res );
            }
            );
                              });
};

helper db2_LocationB_p => async sub ($self, $request) {
    return $self->db2_LocationA_p("LocationB $request"); # For brevity
};

helper add_db1 => async sub($self, $table1, $table2, $table3) {
    # run sequentially. table1 first, then table2, then table3
    my @table1 = await Mojo::Promise->all( map { $self->db1_LocationA_p($_),  $self->db1_LocationB_p($_) } @$table1 );
    my @table2 = await Mojo::Promise->all( map { $self->db1_LocationA_p($_),  $self->db1_LocationB_p($_) } @$table2 );
    my @table3 = await Mojo::Promise->all( map { $self->db1_LocationA_p($_),  $self->db1_LocationB_p($_) } @$table3 );
    return (@table1, @table2, @table3);
};

helper add_db2 => async sub ($self, $table1, $table2) {
    # run sequentially. table1 first, then table2
    my @table1 = await Mojo::Promise->all( map { $self->db2_LocationA_p($_),  $self->db2_LocationB_p($_) } @$table1 );
    my @table2 = await Mojo::Promise->all( map { $self->db2_LocationA_p($_),  $self->db2_LocationB_p($_) } @$table2 );
    return (@table1, @table2);
};

any '/' => async sub ($self) {
    my $param = $self->param('param');

    my ($db1_table1, $db1_table2, $db1_table3, $db2_table1, $db2_table2);
    push @$db1_table1, qq(ADD DB1 TABLE1 : ID=FOO${param};);
    push @$db1_table1, qq(ADD DB1 TABLE1 : ID=BAR${param};);
    push @$db1_table1, qq(ADD DB1 TABLE1 : ID=BAZ${param};);
    push @$db1_table2, qq(ADD DB1 TABLL2 : ID=ABC, IDs = FOO${param}, BAR${param}, BAZ${param};);
    push @$db1_table2, qq(ADD DB1 TABLL2 : ID=XYZ, IDs = FOO${param}, BAR${param}, BAZ${param};);
    push @$db1_table3, qq(ADD DB1 TABLE3 : ID=ZZZ ,IDs = ABC, XYZ;);

    push @$db2_table1, qq(ADD DB2 TABLE1 : ID=FOO${param};);
    push @$db2_table1, qq(ADD DB2 TABLE1 : ID=BAR${param};);
    push @$db2_table1, qq(ADD DB2 TABLE1 : ID=BAZ${param};);
    push @$db2_table1, qq(ADD DB2 TABLE1 : ID=QUX${param};);
    push @$db2_table2, qq(ADD DB2 TABLE2 : ID=FOO, IDs = FOO${param}, BAR${param}, BAZ${param}, QUX${param};);
    push @$db2_table2, qq(ADD DB2 TABLE2 : ID=BAR, IDs = FOO${param}, BAR${param}, BAZ${param}, QUX${param};);
    
    $self->render_later();
    my @results = eval {
        await Mojo::Promise->all(
            # run concurrently. db1 & db2 can run in parallel at the same time.
            $self->add_db1($db1_table1, $db1_table2, $db1_table3),
            $self->add_db2($db2_table1, $db2_table2),
            )};
    
    if (my $err = $@) {
        warn "Something went wrong: " . dumper($err);
        $self->render(json => $err, status=>502 );
    } else {
        say STDERR dumper(@results);
        $self->render(json => {db1=>$results[0], db2=>$results[1]});
    }
};

app->start;

In the example above:

  • add_db1() would immediately make 3 requests to ADD DB1 TABLE1 for db1_LocationA and another 3 on db1_LocationB; and
  • add_db2() would immediately make 3 requests to ADD DB2 TABLE1 for db2_LocationA and another 3 on db2_LocationB.

thus a total of 12 requests to add_db1() & add_db2() combined. My question is, whether it is possible to limit that to, say 3 in total (as an example)

1

There are 1 best solutions below

1
spongi On

I have some experience making http requests with Mojo::Base, I don't quite understand the code you made :(, but this is for example how I make http requests:

#!/usr/bin/perl

use Mojo::Base qw(-strict -signatures -async_await);
use Mojo::Promise;
use Mojo::UserAgent;

my $ua = Mojo::UserAgent->new;
my @urls = map{"http://ffuf.me/cd/pipes/user?id=$_"} 300..1000;

async sub get_pp($url) {
    my $tx = await $ua->get_p($url);

    my $body = $tx->res->body;
    say $tx->req->url;
   
    if ($body!~/Not Found/i) {
       say $tx->req->url . " " . $body;
       exit;
    }
}

async sub main(@urls) {
    await Mojo::Promise->map({concurrency=>20}, sub {
                        get_pp($_) }, @urls);
}

await main(@urls);

-- it is fuzzing from 300 to 1000 in the id parameter, using 20 http requests concurrently

I have more examples like this in my github repo: https://github.com/spoNge369/perl_scrap

before modifying any of the code you provided, you could test these attributes:

  • connect_timeout()
  • inactivity_timeout()
  • max_connections()
  • request_timeout()

apply to the $ua(user-agent) example: Total limit of 5 seconds, of which 3 seconds may be spent connecting $ua->max_redirects(0)->connect_timeout(3)->request_timeout(5);

https://docs.mojolicious.org/Mojo/UserAgent#request_timeout