Angular HttpClient GET parsing error for application/x-ndjson

706 Views Asked by At

I am trying to consume a Spring boot webfulx reactive api in my angular but I am getting below error in browser developer console.

{error: SyntaxError: Unexpected token { in JSON at position 231 at JSON.parse

The api produces application/x-ndjson and I am not sure if HttpClient is unable to parse the response.

My Service Class:

export class UserInfoService {
  private baseUrl = "http://localhost:9095/api/userinfo";

  private headers= new HttpHeaders()
    .set('content-type', 'application/json')
    .set('Access-Control-Allow-Origin', '*')

  constructor(private http: HttpClient) {}

  getUsers(): Observable<UserInfo[]> {
    return this.http.get<UserInfo[]>(
      this.baseUrl + '/users', {'headers':this.headers});
  }
}

My component class

export class DashboardComponent implements OnInit {
  count: any = 0;
  service: HttpServiceService;
  usersList: Array<UserInfo> | undefined;

  constructor(service: HttpServiceService) {
    this.service = service;
  }

  ngOnInit(): void {
    console.log("---ngOnInit()---");
    this.service.getUsers().subscribe({
      next: (result: any) => {
        console.log("||||Response successful");
        this.usersList?.push(result);
        console.log(result);        
      },
      error: (err: any) => {
        console.log(err);
      },
      complete: () => {
        console.log('complete');
      }
    });
  }
}

I want to display the data in the template table reactively. I see the below error in browser console:

@GetMapping(path="/users", produces = "application/x-ndjson")
public Flux<UserInfo> getAll() {
    return userInfoService.getAll();
}

enter image description here

2

There are 2 best solutions below

0
Mauro Aguilar On

ndjson format is not a valid application/json format and that's why you get that error when the HttpClient tries to parse the response.

The solution for this is to request the content as plain text and parse the content afterwards. You can achieve this by sending responseType: 'text' when making the request:

export class UserInfoService {
  private baseUrl = "http://localhost:9095/api/userinfo";

  private headers= new HttpHeaders()
    .set('Access-Control-Allow-Origin', '*')

  constructor(private http: HttpClient) {}

  // here is a simple ndjson parser implementation I found:
  parse(data: string): any[] {
    if (typeof data !== 'string')
      throw new Error(`Unexpected type ${type}`);
    const rows = data.split(/\n|\n\r/).filter(Boolean);
    return rows.map((row) => JSON.parse(row));
  }

  getUsers(): Observable<UserInfo[]> {
    return this.http.get<UserInfo[]>(this.baseUrl + '/users', {
      headers: this.headers,
      responseType: 'text'
    }).pipe(map(this.parse));
  }
}
0
Yasammez On

I wrote a an interceptor for that specific task and switched to fetch, since the angular HttpClientstill is unable to properly frame streaming data.

import { Injectable } from '@angular/core';
import { HttpContextToken, HttpHandler, HttpInterceptor, HttpRequest, HttpResponse } from '@angular/common/http';
import { catchError, defer, of, filter, from, map, mergeMap, Observable, repeat, scan, tap, timer } from 'rxjs';

export const STREAMING = new HttpContextToken<StreamObserver | null>(() => null);

export type StreamObserver = {
    connected: () => void;
    disconnected: () => void;
};

@Injectable()
export class HttpStreamingInterceptor implements HttpInterceptor {
    constructor() {}

    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    intercept(request: HttpRequest<any>, next: HttpHandler): Observable<any> {
        const observer = request.context.get(STREAMING);
        if (!observer) {
            // only act if we should
            return next.handle(request);
        }
        // defer so we can retry on a connection reset
        return defer(() => {
            // notify that we're disconnected - I recommend a debounce() somewhere in the pipe
            // when acting on this
            observer.disconnected();
            return from(
                fetch(request.url, {
                    credentials: 'include',
                    cache: 'no-store',
                    headers: request.headers.keys().map((k): [string, string] => [k, request.headers.get(k) ?? '']),
                })
            );
        }).pipe(
            filter(r => r.ok),
            filter((r): r is typeof r & { body: ReadableStream } => !!r.body),
            // Now we have an Observable<ReadableStream<Uint8Array>>
            // Make that an Observable<Uint8Array>
            mergeMap(r => from(r.body)),
            // Notify that we're connected
            tap(_ => observer.connected()),
            // Don't throw, just complete on errors
            catchError(_ => of()),
            // Once we're done (either by completion or error), go again – this is a stream after all!
            repeat({ delay: () => timer(5000) }),
            map(arr => new TextDecoder('utf-8').decode(arr)),
            // Buffer strings that haven't seen a newline yet, but forget
            // everything we've already emitted!
            scan(
                (acc, curr) => {
                    const newBuf = acc.buffer + curr;
                    const emit = newBuf.split('\n');
                    const buffer = emit.pop() ?? '';
                    return { buffer, emit };
                },
                { buffer: '', emit: [] as string[] }
            ),
            mergeMap(x => x.emit),
            filter((x): x is string => x !== null),
            // Produce nice little HttpResponse events with our separate JSON objects
            map(x => new HttpResponse({ body: JSON.parse(x) }))
        );
    }
}

This can then be used with the provided context token as for example

            http.get('/api/json-stream', { observe: 'response',
                    context: new HttpContext().set(STREAMING, {
                        connected: () => this.store.dispatch(ConnectionEvent()),
                        disconnected: () => this.store.dispatch(DisconnectionEvent()),
                    }),
                });

Note that since the interceptor basically creates an infinitely retrying Observable, the only way to stop it, is unsubscribing. Most effects do this already implicitly by virtue of having a switchMap somewhere in their pipe.