PlayMesh/Docs
HomeGitHubnpm

Queues

Background job processing powered by BullMQ — requires Redis.

Overview

PlayMesh includes a thin BullMQ wrapper via mesh.queues. Queues are available when Redis is configured and are automatically closed during mesh.shutdown().
Queues require Redis. Accessing mesh.queues without Redis configured throws immediately.

QueueManager API

Method / PropertyDescriptionReturns
mesh.queues.queue(name, options?)Get or create a BullMQ Queue. Returns the same instance on repeated calls.Queue
mesh.queues.worker(name, processor, options?)Create a BullMQ Worker. Workers are closed on shutdown.Worker

Basic usage

Adding jobs

const matchmakingQueue = mesh.queues.queue('matchmaking');

lobby.on('join-queue', async (session, payload) => {
  await matchmakingQueue.add('find-match', {
    userId: session.userId,
    rank: (payload as { rank: number }).rank,
  });
  session.send('queued', { position: await matchmakingQueue.count() });
});

Processing jobs

mesh.onStarted(async () => {
  mesh.queues.worker('matchmaking', async job => {
    const { userId, rank } = job.data;
    const opponent = await matchmakingService.findOpponent(userId, rank);

    if (!opponent) {
      // Re-queue for later
      await mesh.queues.queue('matchmaking').add('find-match', job.data, { delay: 5000 });
      return;
    }

    const match = mesh.domain('ranked').createInstance(`match-${Date.now()}`);

    for (const uid of [userId, opponent.userId]) {
      const session = mesh.sessions.find(s => s.userId === uid);
      if (session) {
        await session.leave(mesh.domain('ranked').instance('lobby'));
        await session.join(match);
      }
    }

    match.broadcast('match-found', { players: [userId, opponent.userId] });
  });
});

Delayed jobs

const eventsQueue = mesh.queues.queue('world-events');

// Schedule a boss spawn in 10 minutes
await eventsQueue.add(
  'dragon-spawn',
  { zone: 'forest', bossId: 'elder-dragon' },
  { delay: 10 * 60 * 1000 }
);

mesh.queues.worker('world-events', async job => {
  if (job.name === 'dragon-spawn') {
    const { zone, bossId } = job.data;
    const instance = mesh.resolveInstance(zone);
    await instance.state.set(`boss:${bossId}:health`, 50000);
    instance.broadcast('boss-spawned', { bossId, zone });
  }
});

Repeatable jobs

const tickQueue = mesh.queues.queue('match-tick');

await tickQueue.add('tick', { matchId: 'match-42' }, { repeat: { every: 5000 } });

mesh.queues.worker('match-tick', async job => {
  const instance = mesh.resolveInstance(job.data.matchId);
  const startedAt = await instance.state.get('started-at') as number;
  const elapsed = Date.now() - startedAt;

  instance.broadcast('tick', { elapsed });

  if (elapsed >= 5 * 60 * 1000) {
    instance.broadcast('time-up', {});
    await tickQueue.removeRepeatable('tick', { every: 5000 });
  }
});

Use cases

  • Matchmaking — queue players, poll for opponents, create match instances
  • Scheduled world events — boss spawns, server events, timed rewards
  • Match ticks — periodic game state updates
  • Delayed notifications — "Your item has been crafted"
  • Background saves — async persistence without blocking handlers
  • Analytics pipelines — batch event processing