基于 ring buffer 和内存栅栏 __threadfence():

核心数据结构(无锁 SPSC 环形队列)

// lock_free_queue.cuh

#pragma once
#include <cuda_runtime.h>

template <typename T, int CapacityPow2>
struct LockFreeQueueSPSC {
    // 要求容量是 2 的整数次幂,方便用位与取模
    static_assert((CapacityPow2 & (CapacityPow2 - 1)) == 0,
                  "Capacity must be power of 2.");

    // 环形缓冲区
    T buffer[CapacityPow2];

    // head:下一个要 pop 的位置索引
    // tail:下一个要 push 的位置索引
    // 约定:
    //   - 只有消费者线程写 head
    //   - 只有生产者线程写 tail
    //   - 双方都可以读对方的索引
    unsigned int head;
    unsigned int tail;

    __device__ void init() {
        head = 0;
        tail = 0;
    }

    // push:在队列尾部插入一个元素
    // 返回 true 表示成功,false 表示队列已满
    __device__ bool push(const T &value) {
        // 生产者线程

        // 本线程写 tail,无数据竞争
        unsigned int t = tail;
        // 读消费者的 head,需要保证不被编译器乱优化,cast 成 volatile 指针
        unsigned int h = *reinterpret_cast<volatile unsigned int *>(&head);

        // 如果 tail - head >= capacity 就满了
        if (t - h >= CapacityPow2) {
            return false; // full
        }

        // 写入数据到环形缓冲区
        buffer[t & (CapacityPow2 - 1)] = value;

        // 保证 buffer 写入在 tail 更新前对其他线程可见
        __threadfence();

        // 更新 tail,让消费者看到新元素
        *reinterpret_cast<volatile unsigned int *>(&tail) = t + 1;
        return true;
    }

    // pop:从队列头部取出一个元素
    // 返回 true 表示成功,false 表示队列为空
    __device__ bool pop(T &out) {
        // 消费者线程

        // 本线程写 head,无数据竞争
        unsigned int h = head;
        // 读生产者的 tail
        unsigned int t = *reinterpret_cast<volatile unsigned int *>(&tail);

        if (h == t) {
            return false; // empty
        }

        // 在读取数据前,确保我们看到的是生产者已经完成写入的数据
        __threadfence();

        out = buffer[h & (CapacityPow2 - 1)];

        // 再加一道 fence,确保 out 读完后再移动 head(比较保守的写法)
        __threadfence();

        *reinterpret_cast<volatile unsigned int *>(&head) = h + 1;
        return true;
    }

    // 当前队列元素数量(近似值,非强一致)
    __device__ unsigned int size() const {
        unsigned int h = *reinterpret_cast<const volatile unsigned int *>(&head);
        unsigned int t = *reinterpret_cast<const volatile unsigned int *>(&tail);
        return t - h;
    }

    __device__ bool empty() const {
        return size() == 0;
    }

    __device__ bool full() const {
        return size() >= CapacityPow2;
    }
};

example:

// main.cu

#include <cstdio>
#include "lock_free_queue.cuh"

constexpr int QUEUE_CAPACITY = 1024;  // 必须是 2 的幂
constexpr int N_ITEMS        = 1000;  // 需要传输的元素数量

__global__ void queue_kernel(int *output) {
    // 单个 block 里共享一个队列对象(放在共享内存里也行,这里简单放全局)
    __shared__ LockFreeQueueSPSC<int, QUEUE_CAPACITY> queue;

    if (threadIdx.x == 0) {
        // 生产者初始化队列
        queue.init();
    }

    __syncthreads(); // 确保队列初始化完毕

    if (threadIdx.x == 0) {
        // 生产者
        int produced = 0;
        while (produced < N_ITEMS) {
            if (queue.push(produced)) {
                produced++;
            }
            // 如果满了,会暂时 push 失败,就继续循环重试(自旋)
        }
    } else if (threadIdx.x == 1) {
        // 消费者
        int consumed = 0;
        while (consumed < N_ITEMS) {
            int v;
            if (queue.pop(v)) {
                output[consumed] = v;
                consumed++;
            }
            // 如果空了,就继续循环等待数据(自旋)
        }
    }
}

int main() {
    int *d_out = nullptr;
    int *h_out = new int[N_ITEMS];

    cudaMalloc(&d_out, N_ITEMS * sizeof(int));

    // 启动一个 block,至少两个线程:
    //   thread 0: producer
    //   thread 1: consumer
    queue_kernel<<<1, 2>>>(d_out);
    cudaDeviceSynchronize();

    cudaMemcpy(h_out, d_out, N_ITEMS * sizeof(int), cudaMemcpyDeviceToHost);

    // 简单检查:应该是 0..N_ITEMS-1
    bool ok = true;
    for (int i = 0; i < N_ITEMS; ++i) {
        if (h_out[i] != i) {
            printf("Mismatch at %d: got %d expected %d\n", i, h_out[i], i);
            ok = false;
            break;
        }
    }

    if (ok) {
        printf("Queue works! First 10 values: ");
        for (int i = 0; i < 10 && i < N_ITEMS; ++i) {
            printf("%d ", h_out[i]);
        }
        printf("\n");
    }

    cudaFree(d_out);
    delete[] h_out;
    return 0;
}