Skip to content

Commit

Permalink
feat: add some utility methods
Browse files Browse the repository at this point in the history
This commit adds the following methods:

- fetchSockets: return the matching socket instances
- addSockets: make the matching socket instances join the specified rooms
- delSockets: make the matching socket instances leave the specified rooms
- disconnectSockets: disconnect the matching socket instances

Those methods will then be exposed by the Socket.IO server:

```js
// clear room
io.socketsLeave("room1");

// disconnect all sockets in room
io.in("room2").disconnectSockets();

// fetch socket instances in room
io.in("room3").fetchSockets();
```

This feature will also be extended in the Redis adapter to handle
multiple Socket.IO servers.
  • Loading branch information
darrachequesne committed Feb 27, 2021
1 parent 985bb41 commit 1c9827e
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 52 deletions.
142 changes: 93 additions & 49 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,53 +125,19 @@ export class Adapter extends EventEmitter {
* @public
*/
public broadcast(packet: any, opts: BroadcastOptions): void {
const rooms = opts.rooms;
const flags = opts.flags || {};
const packetOpts = {
preEncoded: true,
volatile: flags.volatile,
compress: flags.compress
};
const ids = new Set();
let except = opts.except || new Set();

packet.nsp = this.nsp.name;
const encodedPackets = this.encoder.encode(packet);

// Allow ids in `except` to be room ids.
if (except.size > 0) {
const exclude = except;
except = new Set(except);
for (const id of exclude) {
if (!this.rooms.has(id)) continue;
for (const sid of this.rooms.get(id)) {
if (sid !== id) {
except.add(sid);
}
}
}
}

if (rooms.size) {
for (const room of rooms) {
if (!this.rooms.has(room)) continue;

for (const id of this.rooms.get(room)) {
if (ids.has(id) || except.has(id)) continue;
const socket = this.nsp.sockets.get(id);
if (socket) {
socket.packet(encodedPackets, packetOpts);
ids.add(id);
}
}
}
} else {
for (const [id] of this.sids) {
if (except.has(id)) continue;
const socket = this.nsp.sockets.get(id);
if (socket) socket.packet(encodedPackets, packetOpts);
}
}
this.apply(opts, socket => {
socket.packet(encodedPackets, packetOpts);
});
}

/**
Expand All @@ -182,31 +148,109 @@ export class Adapter extends EventEmitter {
public sockets(rooms: Set<Room>): Promise<Set<SocketId>> {
const sids = new Set<SocketId>();

this.apply({ rooms }, socket => {
sids.add(socket.id);
});

return Promise.resolve(sids);
}

/**
* Gets the list of rooms a given socket has joined.
*
* @param {SocketId} id the socket id
*/
public socketRooms(id: SocketId): Set<Room> | undefined {
return this.sids.get(id);
}

/**
* Returns the matching socket instances
*
* @param opts - the filters to apply
*/
public fetchSockets(opts: BroadcastOptions): Promise<any[]> {
const sockets = [];

this.apply(opts, socket => {
sockets.push(socket);
});

return Promise.resolve(sockets);
}

/**
* Makes the matching socket instances join the specified rooms
*
* @param opts - the filters to apply
* @param rooms - the rooms to join
*/
public addSockets(opts: BroadcastOptions, rooms: Room[]): void {
this.apply(opts, socket => {
socket.join(rooms);
});
}

/**
* Makes the matching socket instances leave the specified rooms
*
* @param opts - the filters to apply
* @param rooms - the rooms to leave
*/
public delSockets(opts: BroadcastOptions, rooms: Room[]): void {
this.apply(opts, socket => {
rooms.forEach(room => socket.leave(room));
});
}

/**
* Makes the matching socket instances disconnect
*
* @param opts - the filters to apply
* @param close - whether to close the underlying connection
*/
public disconnectSockets(opts: BroadcastOptions, close: boolean): void {
this.apply(opts, socket => {
socket.disconnect(close);
});
}

private apply(opts: BroadcastOptions, callback: (socket) => void): void {
const rooms = opts.rooms;
const except = this.computeExceptSids(opts.except);

if (rooms.size) {
const ids = new Set();
for (const room of rooms) {
if (!this.rooms.has(room)) continue;

for (const id of this.rooms.get(room)) {
if (this.nsp.sockets.has(id)) {
sids.add(id);
if (ids.has(id) || except.has(id)) continue;
const socket = this.nsp.sockets.get(id);
if (socket) {
callback(socket);
ids.add(id);
}
}
}
} else {
for (const [id] of this.sids) {
if (this.nsp.sockets.has(id)) sids.add(id);
if (except.has(id)) continue;
const socket = this.nsp.sockets.get(id);
if (socket) callback(socket);
}
}

return Promise.resolve(sids);
}

/**
* Gets the list of rooms a given socket has joined.
*
* @param {SocketId} id the socket id
*/
public socketRooms(id: SocketId): Set<Room> | undefined {
return this.sids.get(id);
private computeExceptSids(exceptRooms?: Set<Room>) {
const exceptSids = new Set();
if (exceptRooms && exceptRooms.size > 0) {
for (const room of exceptRooms) {
if (this.rooms.has(room)) {
this.rooms.get(room).forEach(sid => exceptSids.add(sid));
}
}
}
return exceptSids;
}
}
47 changes: 44 additions & 3 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ describe("socket.io-adapter", () => {
const adapter = new Adapter({
server: { encoder: null },
sockets: new Map([
["s1", true],
["s2", true],
["s3", true]
["s1", { id: "s1" }],
["s2", { id: "s2" }],
["s3", { id: "s3" }]
])
});
adapter.addAll("s1", new Set(["r1", "r2"]));
Expand Down Expand Up @@ -124,6 +124,47 @@ describe("socket.io-adapter", () => {
expect(ids).to.eql(["s3"]);
});

describe("utility methods", () => {
let adapter;

before(() => {
adapter = new Adapter({
server: { encoder: null },
sockets: new Map([
["s1", { id: "s1" }],
["s2", { id: "s2" }],
["s3", { id: "s3" }]
])
});
});

describe("fetchSockets", () => {
it("returns the matching socket instances", async () => {
adapter.addAll("s1", new Set(["s1"]));
adapter.addAll("s2", new Set(["s2"]));
adapter.addAll("s3", new Set(["s3"]));
const matchingSockets = await adapter.fetchSockets({
rooms: new Set()
});
expect(matchingSockets).to.be.an(Array);
expect(matchingSockets.length).to.be(3);
});

it("returns the matching socket instances within room", async () => {
adapter.addAll("s1", new Set(["r1", "r2"]));
adapter.addAll("s2", new Set(["r1"]));
adapter.addAll("s3", new Set(["r2"]));
const matchingSockets = await adapter.fetchSockets({
rooms: new Set(["r1"]),
except: new Set(["r2"])
});
expect(matchingSockets).to.be.an(Array);
expect(matchingSockets.length).to.be(1);
expect(matchingSockets[0].id).to.be("s2");
});
});
});

describe("events", () => {
it("should emit a 'create-room' event", done => {
const adapter = new Adapter({ server: { encoder: null } });
Expand Down

0 comments on commit 1c9827e

Please sign in to comment.