Skip to content

Commit

Permalink
perf(platform/messaging): subscribe for replies when posting the requ…
Browse files Browse the repository at this point in the history
…est in request-response communication

Registration for responses in a request-response communication can be done directly when sending the request to the broker. No additional interaction with the broker is required. This reduces the number of interactions with the broker and ensures the requestor is registered for responses when the request is dispatched.
  • Loading branch information
danielwiehl authored and Marcarrian committed Nov 2, 2022
1 parent 17bddcf commit cb23da3
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import {MicrofrontendPlatform} from '../../microfrontend-platform';
import {Beans} from '@scion/toolkit/bean-manager';
import {TopicMessage} from '../../messaging.model';
import {ObserveCaptor} from '@scion/toolkit/testing';
import {BrokerGateway} from './broker-gateway';
import {MessagingChannel} from '../../ɵmessaging.model';
import {BrokerGateway, SubscriptionDescriptor} from './broker-gateway';
import {IntentSubscribeCommand, MessagingChannel, TopicSubscribeCommand} from '../../ɵmessaging.model';

describe('BrokerGateway', () => {

Expand All @@ -39,7 +39,33 @@ describe('BrokerGateway', () => {

// GIVEN
const captor = new ObserveCaptor();
Beans.get(BrokerGateway).subscribeToTopic$('topic').subscribe(captor);
const descriptor: SubscriptionDescriptor = {
messageChannel: MessagingChannel.Topic,
subscribeChannel: MessagingChannel.TopicSubscribe,
unsubscribeChannel: MessagingChannel.TopicUnsubscribe,
newSubscribeCommand: (subscriberId: string): TopicSubscribeCommand => ({topic: 'topic', subscriberId, headers: new Map()}),
};
Beans.get(BrokerGateway).subscribe$(descriptor).subscribe(captor);
// WHEN
await MicrofrontendPlatform.destroy();
// THEN
expect(captor.hasCompleted()).toBeFalse();
expect(captor.hasErrored()).toBeFalse();
expect(captor.getValues()).toEqual([]);
});

it('should not complete `subscribeToIntent$` Observable upon platform shutdown (as per API)', async () => {
await MicrofrontendPlatform.startHost({applications: []});

// GIVEN
const captor = new ObserveCaptor();
const descriptor: SubscriptionDescriptor = {
messageChannel: MessagingChannel.Intent,
subscribeChannel: MessagingChannel.IntentSubscribe,
unsubscribeChannel: MessagingChannel.IntentUnsubscribe,
newSubscribeCommand: (subscriberId: string): IntentSubscribeCommand => ({subscriberId, headers: new Map()}),
};
Beans.get(BrokerGateway).subscribe$(descriptor).subscribe(captor);
// WHEN
await MicrofrontendPlatform.destroy();
// THEN
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2020 Swiss Federal Railways
* Copyright (c) 2018-2022 Swiss Federal Railways
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand All @@ -8,7 +8,7 @@
* SPDX-License-Identifier: EPL-2.0
*/
import {AsyncSubject, EMPTY, firstValueFrom, fromEvent, interval, lastValueFrom, merge, MonoTypeOperatorFunction, NEVER, noop, Observable, Observer, of, ReplaySubject, Subject, TeardownLogic, throwError, timeout, timer} from 'rxjs';
import {ConnackMessage, IntentSubscribeCommand, MessageDeliveryStatus, MessageEnvelope, MessagingChannel, MessagingTransport, PlatformTopics, SubscribeCommand, TopicSubscribeCommand, UnsubscribeCommand} from '../../ɵmessaging.model';
import {ConnackMessage, MessageDeliveryStatus, MessageEnvelope, MessagingChannel, MessagingTransport, PlatformTopics, SubscribeCommand, UnsubscribeCommand} from '../../ɵmessaging.model';
import {finalize, map, mergeMap, take, takeUntil, tap} from 'rxjs/operators';
import {filterByChannel, filterByMessageHeader, filterByOrigin, filterByTopicChannel, filterByTransport, filterByWindow, pluckMessage} from '../../operators';
import {UUID} from '@scion/toolkit/uuid';
Expand All @@ -23,7 +23,6 @@ import {MicrofrontendPlatformRef} from '../../microfrontend-platform-ref';
import {MessageClient} from '../../client/messaging/message-client';
import {runSafe} from '../../safe-runner';
import {stringifyError} from '../../error.util';
import {IntentSelector} from './intent-client';

/**
* The gateway is responsible for dispatching messages between the client and the broker.
Expand Down Expand Up @@ -55,15 +54,9 @@ export abstract class BrokerGateway {
public abstract requestReply$<T = any>(channel: MessagingChannel, message: IntentMessage | TopicMessage): Observable<TopicMessage<T>>;

/**
* Subscribes to messages published to the given topic. The Observable never completes.
*/
public abstract subscribeToTopic$<T>(topic: string): Observable<TopicMessage<T>>;

/**
* Subscribes to intents that match the specified selector and for which the application provides a fulfilling capability.
* The Observable never completes.
* Subscribes to described destination, unless the platform has been stopped at the time of subscription.
*/
public abstract subscribeToIntent$<T>(selector?: IntentSelector): Observable<IntentMessage<T>>;
public abstract subscribe$<T extends Message>(subscriptionDescriptor: SubscriptionDescriptor): Observable<T>;

/**
* An Observable that emits when a message from the message broker is received.
Expand Down Expand Up @@ -100,11 +93,7 @@ export class NullBrokerGateway implements BrokerGateway {
return NEVER;
}

public subscribeToTopic$<T>(topic: string): Observable<TopicMessage<T>> {
return NEVER;
}

public subscribeToIntent$<T>(selector: IntentSelector): Observable<IntentMessage<T>> {
public subscribe$<T extends Message>(subscriptionDescriptor: SubscriptionDescriptor): Observable<T> {
return NEVER;
}
}
Expand Down Expand Up @@ -206,58 +195,47 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer {
await whenPosted;
}

public requestReply$<T = any>(channel: MessagingChannel, message: IntentMessage | TopicMessage): Observable<TopicMessage<T>> {
public requestReply$<T = any>(channel: MessagingChannel, request: IntentMessage | TopicMessage): Observable<TopicMessage<T>> {
return new Observable((observer: Observer<TopicMessage>): TeardownLogic => {
if (isPlatformStopped()) {
observer.error(GatewayErrors.PLATFORM_STOPPED_ERROR);
return noop;
}

const replyTo = UUID.randomUUID();
const subscriberId = UUID.randomUUID();
const unsubscribe$ = new Subject<void>();
const requestError$ = new Subject<never>();

// Add 'ReplyTo' topic to the message headers where to receive the response(s).
message.headers.set(MessageHeaders.ReplyTo, replyTo);
request.headers
.set(MessageHeaders.ReplyTo, replyTo) // message header for the replier where to send replies to
.set(MessageHeaders.ɵSubscriberId, subscriberId); // message header to subscribe for replies

// Receive replies sent to the reply topic.
merge(this.subscribeToTopic$<T>(replyTo), requestError$)
.pipe(takeUntil(merge(this._platformStopping$, unsubscribe$)))
merge(this.message$, requestError$)
.pipe(
filterByChannel<TopicMessage<T>>(MessagingChannel.Topic),
filterByMessageHeader({name: MessageHeaders.ɵSubscriberId, value: subscriberId}),
pluckMessage(),
takeUntil(merge(this._platformStopping$, unsubscribe$)),
finalize(() => this.unsubscribe({unsubscribeChannel: MessagingChannel.TopicUnsubscribe, subscriberId, logContext: `[subscriberId=${subscriberId}, topic=${replyTo}]`})),
)
.subscribe({
next: reply => observer.next(reply),
error: error => observer.error(error),
complete: noop, // As per the API, the Observable never completes.
});

// Post the request to the broker.
this.postMessage(channel, message)
this.postMessage(channel, request)
.catch(error => requestError$.error(error));

return (): void => unsubscribe$.next();
});
}

public subscribeToTopic$<T>(topic: string): Observable<TopicMessage<T>> {
return this.subscribe$((subscriberId: string): TopicSubscribeCommand => ({topic, subscriberId, headers: new Map()}), {
messageChannel: MessagingChannel.Topic,
subscribeChannel: MessagingChannel.TopicSubscribe,
unsubscribeChannel: MessagingChannel.TopicUnsubscribe,
});
}

public subscribeToIntent$<T>(selector?: IntentSelector): Observable<IntentMessage<T>> {
return this.subscribe$((subscriberId: string): IntentSubscribeCommand => ({selector, subscriberId, headers: new Map()}), {
messageChannel: MessagingChannel.Intent,
subscribeChannel: MessagingChannel.IntentSubscribe,
unsubscribeChannel: MessagingChannel.IntentUnsubscribe,
});
}

/**
* Subscribes to described destination, unless the platform has been stopped at the time of subscription.
*/
private subscribe$<T extends Message>(produceSubscribeCommand: (subscriberId: string) => SubscribeCommand, descriptor: {messageChannel: MessagingChannel; subscribeChannel: MessagingChannel; unsubscribeChannel: MessagingChannel}): Observable<T> {
const {messageChannel, subscribeChannel, unsubscribeChannel} = descriptor;
public subscribe$<T extends Message>(subscriptionDescriptor: SubscriptionDescriptor): Observable<T> {
const {messageChannel, subscribeChannel, unsubscribeChannel, newSubscribeCommand} = subscriptionDescriptor;

return new Observable((observer: Observer<T>): TeardownLogic => {
if (isPlatformStopped()) {
Expand All @@ -268,7 +246,6 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer {
const subscriberId = UUID.randomUUID();
const unsubscribe$ = new Subject<void>();
const subscribeError$ = new Subject<never>();
const subscribeCommand: SubscribeCommand = produceSubscribeCommand(subscriberId);

// Receive messages of given subscription.
merge(this.message$, subscribeError$)
Expand All @@ -277,7 +254,7 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer {
filterByMessageHeader({name: MessageHeaders.ɵSubscriberId, value: subscriberId}),
pluckMessage(),
takeUntil(merge(this._platformStopping$, unsubscribe$)),
finalize(() => this.unsubscribe({unsubscribeChannel, subscriberId, logContext: JSON.stringify(subscribeCommand)})),
finalize(() => this.unsubscribe({unsubscribeChannel, subscriberId, logContext: JSON.stringify(newSubscribeCommand(subscriberId))})),
)
.subscribe({
next: message => observer.next(message),
Expand All @@ -286,7 +263,7 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer {
});

// Post the subscription to the broker.
this.postMessage(subscribeChannel, subscribeCommand)
this.postMessage(subscribeChannel, newSubscribeCommand(subscriberId))
.catch(error => subscribeError$.error(error));

return (): void => unsubscribe$.next();
Expand Down Expand Up @@ -539,3 +516,27 @@ namespace GatewayErrors {
return Error(`[GatewayError] Message broker not discovered within ${timeout}ms. Messages cannot be published or received.`);
}
}

/**
* Describes how to subscribe for messages.
*
* @ignore
*/
export interface SubscriptionDescriptor {
/**
* Channel for receiving subscribed messages.
*/
messageChannel: MessagingChannel;
/**
* Channel to send the subscribe request.
*/
subscribeChannel: MessagingChannel;
/**
* Channel to send the unsubscribe request.
*/
unsubscribeChannel: MessagingChannel;
/**
* Callback that is invoked to create the subscription command that will be sent over the subscription channel.
*/
newSubscribeCommand: (subscriberId: string) => SubscribeCommand;
}
Original file line number Diff line number Diff line change
Expand Up @@ -1239,15 +1239,29 @@ describe('Messaging', () => {

// Subscribe and wait until the initial subscription count, which is 0, is reported.
const subscriberCountCaptor = new ObserveCaptor();

Beans.get(MessageClient).subscriberCount$('some-topic').subscribe(subscriberCountCaptor);
await subscriberCountCaptor.waitUntilEmitCount(1);

const subscription1 = Beans.get(MessageClient).observe$<string>('some-topic').subscribe();
await subscriberCountCaptor.waitUntilEmitCount(2);

subscription1.unsubscribe();
await subscriberCountCaptor.waitUntilEmitCount(3);

Beans.get(MessageClient).observe$<string>('some-topic').subscribe().unsubscribe();
const subscription2 = Beans.get(MessageClient).observe$<string>('some-topic').subscribe();
await subscriberCountCaptor.waitUntilEmitCount(4);

const subscription3 = Beans.get(MessageClient).observe$<string>('some-topic').subscribe();
await subscriberCountCaptor.waitUntilEmitCount(5);

subscription2.unsubscribe();
await subscriberCountCaptor.waitUntilEmitCount(6);

subscription3.unsubscribe();
await subscriberCountCaptor.waitUntilEmitCount(7);

await expectEmissions(subscriberCountCaptor).toEqual([0, 1, 0, 1, 2, 1, 0]);
expect(subscriberCountCaptor.getValues()).toEqual([0, 1, 0, 1, 2, 1, 0]);
expect(subscriberCountCaptor.hasCompleted()).withContext('hasCompleted').toBeFalse();
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import {defer, Observable, Subscription} from 'rxjs';
import {Intent, IntentMessage, throwOnErrorStatus, TopicMessage} from '../../messaging.model';
import {BrokerGateway} from './broker-gateway';
import {MessagingChannel} from '../../ɵmessaging.model';
import {IntentSubscribeCommand, MessagingChannel} from '../../ɵmessaging.model';
import {assertExactQualifier} from '../../qualifier-matcher';
import {IntentClient, IntentOptions, IntentSelector} from './intent-client';
import {Beans} from '@scion/toolkit/bean-manager';
Expand Down Expand Up @@ -51,7 +51,12 @@ export class ɵIntentClient implements IntentClient {
}

public observe$<T>(selector?: IntentSelector): Observable<IntentMessage<T>> {
return this._brokerGateway.subscribeToIntent$<T>(selector);
return this._brokerGateway.subscribe$({
messageChannel: MessagingChannel.Intent,
subscribeChannel: MessagingChannel.IntentSubscribe,
unsubscribeChannel: MessagingChannel.IntentUnsubscribe,
newSubscribeCommand: (subscriberId: string): IntentSubscribeCommand => ({selector, subscriberId, headers: new Map()}),
});
}

public onIntent<IN = any, OUT = any>(selector: IntentSelector, callback: (intentMessage: IntentMessage<IN>) => Observable<OUT> | Promise<OUT> | OUT | void): Subscription {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import {defer, noop, Observable, Subject, Subscription} from 'rxjs';
import {IntentMessage, mapToBody, throwOnErrorStatus, TopicMessage} from '../../messaging.model';
import {BrokerGateway} from './broker-gateway';
import {MessagingChannel, PlatformTopics} from '../../ɵmessaging.model';
import {MessagingChannel, PlatformTopics, TopicSubscribeCommand} from '../../ɵmessaging.model';
import {Topics} from '../../topics.util';
import {MessageClient, PublishOptions, RequestOptions} from './message-client';
import {Beans} from '@scion/toolkit/bean-manager';
Expand Down Expand Up @@ -48,7 +48,12 @@ export class ɵMessageClient implements MessageClient {

public observe$<T>(topic: string): Observable<TopicMessage<T>> {
assertTopic(topic, {allowWildcardSegments: true});
return this._brokerGateway.subscribeToTopic$<T>(topic);
return this._brokerGateway.subscribe$({
messageChannel: MessagingChannel.Topic,
subscribeChannel: MessagingChannel.TopicSubscribe,
unsubscribeChannel: MessagingChannel.TopicUnsubscribe,
newSubscribeCommand: (subscriberId: string): TopicSubscribeCommand => ({topic, subscriberId, headers: new Map()}),
});
}

public onMessage<IN = any, OUT = any>(topic: string, callback: (message: TopicMessage<IN>) => Observable<OUT> | Promise<OUT> | OUT | void): Subscription {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ export interface Client {
*/
markStaleAndQueueForRemoval(): void;

/**
* Deprecated APIs used by the client.
*/
readonly deprecations: {
/**
* @deprecated since version 1.0.0-rc.8; Legacy support will be removed in version 1.0.0.
*/
legacyIntentSubscriptionApi: boolean;
/**
* @deprecated since version 1.0.0-rc.9; Legacy support will be removed in version 1.0.0.
*/
legacyRequestResponseSubscriptionApi: boolean;
};

/**
* Releases resources allocated by this client.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ import {ClientRegistry} from './client.registry';
import {CLIENT_HEARTBEAT_INTERVAL, STALE_CLIENT_UNREGISTER_DELAY} from './client.constants';
import {Client} from './client';
import {ɵApplication} from '../application-registry';
import {IntentSubscription, IntentSubscriptionRegistry} from '../message-broker/intent-subscription.registry';
import {UUID} from '@scion/toolkit/uuid';

export class ɵClient implements Client {

public readonly version: string;
public readonly deprecations: {legacyIntentSubscriptionApi: boolean; legacyRequestResponseSubscriptionApi: boolean};
private _heartbeat: Subscription | undefined;
private _heartbeatInterval: number;
private _staleClientUnregisterTimer: Subscription | undefined;
Expand All @@ -37,6 +40,17 @@ export class ɵClient implements Client {
this._heartbeatInterval = Beans.get(CLIENT_HEARTBEAT_INTERVAL);
this._staleClientUnregisterDelay = Beans.get(STALE_CLIENT_UNREGISTER_DELAY);
this.installHeartbeatMonitor();
this.deprecations = {
legacyIntentSubscriptionApi: semver.lt(this.version, '1.0.0-rc.8'),
legacyRequestResponseSubscriptionApi: semver.lt(this.version, '1.0.0-rc.9'),
};
if (this.deprecations.legacyIntentSubscriptionApi) {
this.installLegacyClientIntentSubscription();
Beans.get(Logger).warn(`[DEPRECATION][FE93C94] Application "${application.symbolicName}" is using a legacy protocol for subscribing to intents. Please update @scion/microfrontend-platform to version '${Beans.get(ɵVERSION)}'.`, new LoggingContext(application.symbolicName, this.version));
}
if (this.deprecations.legacyRequestResponseSubscriptionApi) {
Beans.get(Logger).warn(`[DEPRECATION][F6DC38E] Application "${application.symbolicName}" is using a legacy request-response communication protocol. Please update @scion/microfrontend-platform to version '${Beans.get(ɵVERSION)}'.`, new LoggingContext(application.symbolicName, this.version));
}
}

/**
Expand Down Expand Up @@ -104,4 +118,12 @@ export class ɵClient implements Client {
new LoggingContext(this.application.symbolicName, this.version),
);
}

/**
* Installs legacy intent subscription support for clients older than version 1.0.0-rc.8.
*/
private installLegacyClientIntentSubscription(): void {
const legacyClientSubscription = new IntentSubscription({}, UUID.randomUUID(), this);
Beans.get(IntentSubscriptionRegistry).register(legacyClientSubscription);
}
}
Loading

0 comments on commit cb23da3

Please sign in to comment.