Using RxJs and Angular 2 in order to deal with server-sent events

balteo picture balteo · Apr 23, 2016 · Viewed 14.5k times · Source

I am trying to display server-sent events emitted values in an angular 2 /RxJs app.

The backend regularly sends individual strings to the client through server-sent events.

I am not sure how to deal with the retrieved values on the angular 2/RxJs side.

Here is my client (a ng component):

import {Component, OnInit} from 'angular2/core';
import {Http, Response} from 'angular2/http';
import 'rxjs/Rx';
import {Observable}     from 'rxjs/Observable';

@Component({
    selector: 'my-app',
    template: `<h1>My second Angular 2 App</h1>
    <ul>
        <li *ngFor="#s of someStrings | async">
           a string: {{ s }}
        </li>
    </ul>
    `
})
export class AppComponent implements OnInit {

    constructor(private http:Http) {
    }

    errorMessage:string;
    someStrings:string[];

    ngOnInit() {
        this.getSomeStrings()
            .subscribe(
                aString => this.someStrings.push(aString),
                error => this.errorMessage = <any>error);
    }

    private getSomeStrings():Observable<string> {
        return this.http.get('interval-sse-observable')
            .map(this.extractData)
            .catch(this.handleError);
    }

    private extractData(res:Response) {
        if (res.status < 200 || res.status >= 300) {
            throw new Error('Bad response status: ' + res.status);
        }
        let body = res.json();
        return body || {};
    }

    private handleError(error:any) {
        // In a real world app, we might send the error to remote logging infrastructure
        let errMsg = error.message || 'Server error';
        console.error(errMsg); // log to console instead
        return Observable.throw(errMsg);
    }
}

The backend method is as follows (and uses RxJava):

   @ResponseStatus(HttpStatus.OK)
   @RequestMapping(method = RequestMethod.GET, path = "interval-sse-observable")
    public SseEmitter tickSseObservable() {
        return RxResponse.sse(
                Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
                        .map(tick -> randomUUID().toString())
        );
    }

I just noticed that the app hangs on the request and that nothing is displayed on the page.

I suspect there is an issue with my use of the map method i.e. .map(this.extractData).

I would just like to add the incoming strings to the array and display that array in the template which would update as the strings come in.

Can anyone please help?

edit: Here is a working solution (thanks to Thierry's answer below):

import {Component, OnInit} from 'angular2/core';
import 'rxjs/Rx';

@Component({
    selector: 'my-app',
    template: `<h1>My second Angular 2 App</h1>
    <ul>
        <li *ngFor="#s of someStrings">
           a string: {{ s }}
        </li>
    </ul>
    `
})
export class AppComponent implements OnInit {

    someStrings:string[] = [];

    ngOnInit() {
        let source = new EventSource('/interval-sse-observable');
        source.addEventListener('message', aString => this.someStrings.push(aString.data), false);
    }
}

Answer

abahet picture abahet · Dec 29, 2017

Here is a working example :

SseService

import {Injectable} from '@angular/core';
import {Observable} from 'rxjs/Observable';

declare var EventSource;

@Injectable()
export class SseService {

    constructor() {
    }

    observeMessages(sseUrl: string): Observable<string> {
        return new Observable<string>(obs => {
            const es = new EventSource(sseUrl);
            es.addEventListener('message', (evt) => {
                console.log(evt.data);
                obs.next(evt.data);
            });
            return () => es.close();
        });
    }
}

AppComponent

import {Component, OnDestroy, OnInit} from '@angular/core';
import {SseService} from './shared/services/sse/sse.service';
import {Observable, Subscription} from 'rxjs/Rx';

@Component({
    selector: 'my-app',
    template: `<h1>Angular Server-Sent Events</h1>
    <ul>
        <li *ngFor="let message of messages">
             {{ message }}
        </li>
    </ul>
    `
})
export class AppComponent implements OnInit, OnDestroy {
    private sseStream: Subscription;
    messages:Array<string> = [];

    constructor(private sseService: SseService){
    }

    ngOnInit() {
        this.sseStream = this.sseService.observeMessages('https://server.com/mysse')
                        .subscribe(message => {
                            messages.push(message);
                        });
    }

    ngOnDestroy() {
        if (this.sseStream) {
            this.sseStream.unsubscribe();
        }
    }
}