Skip to content

Latest commit

 

History

History
608 lines (474 loc) · 20.7 KB

File metadata and controls

608 lines (474 loc) · 20.7 KB

Node.js Atomics API Tutorial

Table of Contents

  1. Introduction
  2. Prerequisites
  3. What are Atomics?
  4. Setting Up SharedArrayBuffer
  5. Basic Atomic Operations
  6. Synchronization with Atomics
  7. Practical Examples
  8. Best Practices
  9. Common Pitfalls
  10. Resources

Introduction

The Atomics API in JavaScript provides a way to perform atomic operations on shared memory. When working with multiple threads (via Worker Threads in Node.js or Web Workers in browsers), regular operations on shared memory can lead to race conditions. Atomics solve this by ensuring operations complete entirely before any other thread can interfere.

Think of atomics like a "reservation system" for memory locations. When one thread wants to modify a value, it can do so atomically - meaning no other thread can interrupt the operation halfway through.

Prerequisites

  • Node.js 10.5.0 or higher (for Worker Threads support)
  • Basic understanding of JavaScript
  • Familiarity with concepts like threads and concurrency (helpful but not required)

What are Atomics?

Atomic operations are indivisible operations that appear to occur instantaneously from the perspective of other threads. In JavaScript, the Atomics object provides static methods for performing atomic operations on SharedArrayBuffer objects.

Why Do We Need Atomics?

Consider this scenario without atomics:

// Thread 1: counter = counter + 1
// Thread 2: counter = counter + 1
// Expected result: counter increases by 2
// Possible actual result: counter increases by only 1 (race condition)

With atomics, we can ensure thread-safe operations:

// Both threads use: Atomics.add(sharedArray, index, 1)
// Guaranteed result: counter increases by 2

Setting Up SharedArrayBuffer

Before using Atomics, you need a SharedArrayBuffer - a buffer that can be shared between multiple threads.

// Create a shared buffer with 1024 bytes
const sharedBuffer = new SharedArrayBuffer(1024);

// Create typed arrays to work with the buffer
const sharedInt32Array = new Int32Array(sharedBuffer);
const sharedUint8Array = new Uint8Array(sharedBuffer);

// Note: Different typed arrays can view the same underlying buffer
console.log('Buffer size:', sharedBuffer.byteLength); // 1024
console.log('Int32Array length:', sharedInt32Array.length); // 256 (1024/4)
console.log('Uint8Array length:', sharedUint8Array.length); // 1024

Basic Atomic Operations

1. Atomics.load() and Atomics.store()

These operations atomically read from and write to shared memory.

const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

if (isMainThread) {
    // Main thread setup
    const sharedBuffer = new SharedArrayBuffer(16);
    const sharedArray = new Int32Array(sharedBuffer);
    
    // Atomically store a value
    Atomics.store(sharedArray, 0, 42);
    console.log('Stored value:', Atomics.load(sharedArray, 0)); // 42
    
    // Create worker thread
    const worker = new Worker(__filename, {
        workerData: { sharedBuffer }
    });
    
    worker.on('message', (data) => {
        console.log('Message from worker:', data);
        // Read the value modified by worker
        console.log('Value after worker modification:', Atomics.load(sharedArray, 0));
    });
    
} else {
    // Worker thread
    const sharedArray = new Int32Array(workerData.sharedBuffer);
    
    // Atomically read the value
    const currentValue = Atomics.load(sharedArray, 0);
    console.log('Worker read value:', currentValue);
    
    // Atomically store a new value
    Atomics.store(sharedArray, 0, currentValue * 2);
    
    parentPort.postMessage('Worker finished');
}

2. Atomics.add(), Atomics.sub(), and other arithmetic operations

const sharedBuffer = new SharedArrayBuffer(16);
const sharedArray = new Int32Array(sharedBuffer);

// Initialize with a value
Atomics.store(sharedArray, 0, 10);

// Atomic addition - returns the old value
const oldValue = Atomics.add(sharedArray, 0, 5);
console.log('Old value:', oldValue); // 10
console.log('New value:', Atomics.load(sharedArray, 0)); // 15

// Atomic subtraction
Atomics.sub(sharedArray, 0, 3);
console.log('After subtraction:', Atomics.load(sharedArray, 0)); // 12

// Other operations
Atomics.and(sharedArray, 0, 0xFF); // Bitwise AND
Atomics.or(sharedArray, 0, 0x01);  // Bitwise OR
Atomics.xor(sharedArray, 0, 0x02); // Bitwise XOR

3. Atomics.compareExchange()

This operation compares a value and exchanges it only if it matches the expected value. It's the foundation for many lock-free algorithms.

const sharedBuffer = new SharedArrayBuffer(16);
const sharedArray = new Int32Array(sharedBuffer);

Atomics.store(sharedArray, 0, 100);

// Compare and exchange: if value at index 0 is 100, replace it with 200
const wasExchanged = Atomics.compareExchange(sharedArray, 0, 100, 200);
console.log('Exchange successful:', wasExchanged === 100); // true
console.log('New value:', Atomics.load(sharedArray, 0)); // 200

// This exchange will fail because current value is 200, not 100
const failedExchange = Atomics.compareExchange(sharedArray, 0, 100, 300);
console.log('Exchange failed:', failedExchange === 200); // true (returns current value)
console.log('Value unchanged:', Atomics.load(sharedArray, 0)); // 200

Synchronization with Atomics

Atomics.wait() and Atomics.notify()

These methods provide a way to block a thread until a condition is met, similar to condition variables in other programming languages.

const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

if (isMainThread) {
    const sharedBuffer = new SharedArrayBuffer(16);
    const sharedArray = new Int32Array(sharedBuffer);
    
    // Initialize with 0 (waiting state)
    Atomics.store(sharedArray, 0, 0);
    
    const worker = new Worker(__filename, {
        workerData: { sharedBuffer }
    });
    
    // Wait 2 seconds, then notify the worker
    setTimeout(() => {
        console.log('Main thread: Setting value and notifying worker');
        Atomics.store(sharedArray, 0, 1);
        
        // Notify one waiting thread
        const notifiedCount = Atomics.notify(sharedArray, 0, 1);
        console.log('Notified threads:', notifiedCount);
    }, 2000);
    
    worker.on('message', (data) => {
        console.log('Worker message:', data);
    });
    
} else {
    const sharedArray = new Int32Array(workerData.sharedBuffer);
    
    console.log('Worker: Waiting for notification...');
    
    // Wait until the value at index 0 is not 0, with a timeout of 5000ms
    const result = Atomics.wait(sharedArray, 0, 0, 5000);
    
    if (result === 'ok') {
        console.log('Worker: Received notification!');
        const value = Atomics.load(sharedArray, 0);
        parentPort.postMessage(`Worker woke up with value: ${value}`);
    } else if (result === 'timed-out') {
        console.log('Worker: Wait timed out');
        parentPort.postMessage('Worker timed out');
    }
}

Practical Examples

Example 1: Thread-Safe Counter

const {
  Worker,
  isMainThread,
  parentPort,
  workerData,
} = require("worker_threads");

class AtomicCounter {
  constructor(sharedBuffer, shouldInitialize = false) {
    this.sharedArray = new Int32Array(sharedBuffer);
    // Only initialize counter to 0 when explicitly requested (main thread)
    if (shouldInitialize) {
      Atomics.store(this.sharedArray, 0, 0);
    }
  }

  increment() {
    return Atomics.add(this.sharedArray, 0, 1);
  }

  decrement() {
    return Atomics.sub(this.sharedArray, 0, 1);
  }

  get value() {
    return Atomics.load(this.sharedArray, 0);
  }

  compareAndSet(expected, newValue) {
    return (
      Atomics.compareExchange(this.sharedArray, 0, expected, newValue) ===
      expected
    );
  }
}

if (isMainThread) {
  const sharedBuffer = new SharedArrayBuffer(16);
  const counter = new AtomicCounter(sharedBuffer, true);

  console.log("Initial counter value:", counter.value);

  // Create multiple workers
  const workersPromises = [];
  const numWorkers = 4;
  const incrementsPerWorker = 1000;

  for (let i = 0; i < numWorkers; i++) {
    const workerPromise = new Promise((resolve) => {
      const worker = new Worker(__filename, {
        workerData: { sharedBuffer, incrementsPerWorker, workerId: i },
      });

      worker.on("message", (data) => {
        console.log(`Worker ${data.workerId} finished`);
        resolve();
        worker.terminate();
      });
    });

    workersPromises.push(workerPromise);
  }

  // Wait for all workers to complete and terminate them
  Promise.all(workersPromises).then(() => {
    console.log("All workers finished");
    console.log("Final counter value:", counter.value);
    console.log("Expected value:", numWorkers * incrementsPerWorker);
  });
} else {
  const counter = new AtomicCounter(workerData.sharedBuffer);
  const { incrementsPerWorker, workerId } = workerData;

  // Each worker increments the counter many times
  for (let i = 0; i < incrementsPerWorker; i++) {
    counter.increment();
  }

  parentPort.postMessage({ workerId });
}

Example 2: Producer-Consumer Pattern

The producer-consumer pattern is a classic concurrency problem where some threads produce data while others consume it. Think of it like a factory conveyor belt where workers add items to the belt while other workers take items off. We need a thread-safe queue to manage this flow.

Understanding the Head and Tail Pointer System

Before diving into the code, let's understand how a circular queue works with head and tail pointers. Imagine a circular buffer as a race track where cars (data items) move in one direction:

  • Head pointer: Points to the next item to be consumed (removed from the queue). Think of this as where the "consumer" is currently reading from.
  • Tail pointer: Points to the next empty slot where new items can be added. This is where the "producer" will place the next item.

The key insight is that both pointers move forward in a circular fashion. When they reach the end of the buffer, they wrap around to the beginning.

Visual representation of queue states:

Empty queue (head = tail = 0):
[_][_][_][_][_]
 ^
head/tail

After adding one item (head = 0, tail = 1):
[X][_][_][_][_]
 ^  ^
head tail

After adding three items (head = 0, tail = 3):
[X][X][X][_][_]
 ^        ^
head     tail

After consuming one item (head = 1, tail = 3):
[_][X][X][_][_]
    ^     ^
   head  tail

The queue is full when (tail + 1) % size == head, and empty when head == tail.

const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

class AtomicQueue {
    constructor(sharedBuffer, size) {
        this.sharedArray = new Int32Array(sharedBuffer);
        this.size = size;
        
        // Memory layout: [head, tail, ...data slots]
        // We reserve the first two positions for our pointers
        this.headIndex = 0;      // Index where head pointer is stored
        this.tailIndex = 1;      // Index where tail pointer is stored  
        this.dataStartIndex = 2; // Where actual data storage begins
        
        // Initialize both pointers to 0 (empty queue state)
        // Head points to next item to consume, tail points to next slot to fill
        Atomics.store(this.sharedArray, this.headIndex, 0);
        Atomics.store(this.sharedArray, this.tailIndex, 0);
    }
    
    enqueue(value) {
        // Producer operation: Add an item to the queue
        while (true) {
            // Step 1: Get current head and tail positions
            // We need both to determine if queue is full
            const head = Atomics.load(this.sharedArray, this.headIndex);
            const tail = Atomics.load(this.sharedArray, this.tailIndex);
            
            // Step 2: Calculate where the next tail would be after adding an item
            // Use modulo to wrap around when we reach the end of the buffer
            const nextTail = (tail + 1) % this.size;
            
            // Step 3: Check if queue is full
            // Queue is full when the next tail position would equal head
            // This means there's no empty slot between the last item and first unconsumed item
            if (nextTail === head) {
                // Queue is full, wait a bit and try again
                // In a real application, you might want to yield to other threads
                // or implement a proper waiting mechanism
                continue;
            }
            
            // Step 4: Try to atomically claim the current tail slot
            // This is the critical section - we use compareExchange to ensure
            // only one producer can claim this slot
            if (Atomics.compareExchange(this.sharedArray, this.tailIndex, tail, nextTail) === tail) {
                // Success! We've atomically moved the tail pointer forward
                // Now we own the slot at position 'tail' and can safely write to it
                
                // Step 5: Store the actual data in the claimed slot
                // Note: We use the old tail value as our data index
                Atomics.store(this.sharedArray, this.dataStartIndex + tail, value);
                return true;
            }
            
            // If compareExchange failed, another producer beat us to it
            // The tail has moved since we read it, so we loop and try again
            // This is the lock-free approach - we retry rather than block
        }
    }
    
    dequeue() {
        // Consumer operation: Remove an item from the queue
        while (true) {
            // Step 1: Get current head and tail positions
            const head = Atomics.load(this.sharedArray, this.headIndex);
            const tail = Atomics.load(this.sharedArray, this.tailIndex);
            
            // Step 2: Check if queue is empty
            // When head equals tail, there are no items to consume
            if (head === tail) {
                return null; // Queue is empty
            }
            
            // Step 3: Read the value we're about to consume
            // We read it now (before moving head) to ensure we get the correct value
            // Even if another consumer moves head, this value is still ours to read
            const value = Atomics.load(this.sharedArray, this.dataStartIndex + head);
            
            // Step 4: Calculate the next head position
            const nextHead = (head + 1) % this.size;
            
            // Step 5: Try to atomically claim this item by moving head forward
            // This is like taking a ticket from a queue - only one consumer can get it
            if (Atomics.compareExchange(this.sharedArray, this.headIndex, head, nextHead) === head) {
                // Success! We've claimed this item and moved the head pointer
                // Return the value we read earlier
                return value;
            }
            
            // If compareExchange failed, another consumer beat us to this item
            // We loop and try to get the next available item
        }
    }
}

if (isMainThread) {
    const queueSize = 10;
    const bufferSize = (queueSize + 2) * 4; // +2 for head and tail indices
    const sharedBuffer = new SharedArrayBuffer(bufferSize);
    const queue = new AtomicQueue(sharedBuffer, queueSize);
    
    // Create producer
    const producer = new Worker(__filename, {
        workerData: { sharedBuffer, queueSize, role: 'producer' }
    });
    
    // Create consumer
    const consumer = new Worker(__filename, {
        workerData: { sharedBuffer, queueSize, role: 'consumer' }
    });
    
    producer.on('message', (data) => {
        console.log('Producer:', data);
    });
    
    consumer.on('message', (data) => {
        console.log('Consumer:', data);
    });
    
    // Clean up after 5 seconds
    setTimeout(() => {
        producer.terminate();
        consumer.terminate();
        console.log('Demo completed');
    }, 5000);
    
} else {
    const { sharedBuffer, queueSize, role } = workerData;
    const queue = new AtomicQueue(sharedBuffer, queueSize);
    
    if (role === 'producer') {
        let counter = 0;
        const produceInterval = setInterval(() => {
            const value = ++counter;
            if (queue.enqueue(value)) {
                parentPort.postMessage(`Produced: ${value}`);
            }
            
            if (counter >= 20) {
                clearInterval(produceInterval);
                parentPort.postMessage('Producer finished');
            }
        }, 200);
        
    } else if (role === 'consumer') {
        const consumeInterval = setInterval(() => {
            const value = queue.dequeue();
            if (value !== null) {
                parentPort.postMessage(`Consumed: ${value}`);
            }
        }, 300);
        
        // Consumer runs indefinitely until terminated
    }
}

Best Practices

1. Always Use Appropriate Data Types

// Good: Use Int32Array for atomic operations
const sharedBuffer = new SharedArrayBuffer(16);
const atomicArray = new Int32Array(sharedBuffer);

// Avoid: Regular arrays don't support atomic operations
// const regularArray = []; // This won't work with Atomics

2. Handle Wait Timeouts

// Always specify timeouts for wait operations
const result = Atomics.wait(sharedArray, index, expectedValue, 5000);
if (result === 'timed-out') {
    console.log('Operation timed out, handling gracefully');
    // Handle timeout scenario
}

3. Use compareExchange for Complex Updates

// Good: Atomic read-modify-write
function atomicMax(sharedArray, index, newValue) {
    while (true) {
        const currentValue = Atomics.load(sharedArray, index);
        if (newValue <= currentValue) {
            return currentValue; // No update needed
        }
        
        if (Atomics.compareExchange(sharedArray, index, currentValue, newValue) === currentValue) {
            return newValue; // Successfully updated
        }
        // Loop again if compareExchange failed
    }
}

4. Minimize Shared State

// Good: Minimize what needs to be shared
class TaskManager {
    constructor(sharedBuffer) {
        this.sharedArray = new Int32Array(sharedBuffer);
        // Only share essential state
        this.taskCountIndex = 0;
        this.completedCountIndex = 1;
    }
    
    // Keep complex logic in individual threads
    processTaskLocally(taskData) {
        // Process task without shared state
        const result = heavyComputation(taskData);
        
        // Only update shared counters atomically
        Atomics.add(this.sharedArray, this.completedCountIndex, 1);
        
        return result;
    }
}

Common Pitfalls

1. Deadlocks with Multiple Wait Operations

// Dangerous: Can cause deadlock
// Thread 1 waits on index 0, Thread 2 waits on index 1
// Neither thread notifies the other

// Better: Use timeouts and structured waiting patterns
const result = Atomics.wait(sharedArray, 0, expectedValue, 1000);
if (result === 'timed-out') {
    // Handle timeout and avoid deadlock
}

2. Memory Ordering Assumptions

// Dangerous: Assuming order of operations across different indices
Atomics.store(sharedArray, 0, 1);  // Operation A
Atomics.store(sharedArray, 1, 2);  // Operation B

// Another thread might see Operation B before Operation A
// Use explicit synchronization if order matters

3. Mixing Atomic and Non-Atomic Operations

// Dangerous: Mixing atomic and regular operations
Atomics.store(sharedArray, 0, 1);  // Atomic
sharedArray[1] = 2;                // Non-atomic - race condition!

// Good: Be consistent
Atomics.store(sharedArray, 0, 1);  // Atomic
Atomics.store(sharedArray, 1, 2);  // Atomic

Resources

Running the Examples

To run these examples:

  1. Save each example to a separate .js file
  2. Make sure you're using Node.js 10.5.0 or higher
  3. Run with: node example-filename.js

Some examples are self-contained worker demonstrations - they will create their own worker threads automatically.


This tutorial provides a foundation for understanding and using the Atomics API in Node.js. As you become more comfortable with these concepts, you can explore more advanced patterns like lock-free data structures and complex synchronization primitives.