MultiQueue2 is a fast bounded mpmc queue that supports broadcast/broadcast style operations

MultiQueue was developed by Sam Schetterer, but not updated for some time. I found it very useful as it implements futures. However, it is with a few outdated library API and the use of spin locks is taking 100% CPU in many cases.

# Stone Age

For any channels, if there is a potential for conflicts (more than one of the writers/readers are consuming the resources), there must be locks. So the 100% issue is very likely to be caused by a spinlock.

At first, I did it in the sleep way

#[inline(always)]
pub fn check(seq: usize, at: &AtomicUsize, wc: &AtomicUsize) -> bool {
wc.load(Relaxed) == 0 || seq == cur_count || past(seq, cur_count).1
}

Nonetheless, I sacrified the throughput because I punished every check.

And then, I tried a much smaller sleep. This is just meaningless because it does not trigger CPU from P-State to S-State. And I immediately find that it may be fool to cocerce S-State in a spinlock design.

And then I tried 1ms,10ms,1000ns, for both settings of check after wait or check before wait. link. It does not make much different.

I tried the _mm_pause from SSE2 as well, but it does not make any different.

#[inline(always)]
pub fn check(seq: usize, at: &AtomicUsize, wc: &AtomicUsize) -> bool {
if is_x86_feature_detected!("sse2") {
#[cfg(target_arch = "x86")]
use std::arch::x86::_mm_pause;
#[cfg(target_arch = "x86_64")]
use std::arch::x86_64::_mm_pause;
unsafe{
_mm_pause();
}
}else{
}
wc.load(Relaxed) == 0 || seq == cur_count || past(seq, cur_count).1

}

Although _mm_pause is designed for spinlock, but the point is CPU will not switch P-State and S-State just for a few nono-seconds. It is simply not the right way.

# Tool Age

Keep trying the trival ways will never proceed more (although it practically solved the system problem of 100% cpu usage in my program and my program is not required to be very performant too), so I stepped back and rethought about my first solution.

The first solution was actually doing less checking and forcing the cpu to sleep, so that it does not waste too much energy to do useless checkings which are 99%(roughly) predictable.

What I really want to punish are the collided conflicts, because they are very likely to be blocked again if the CPU checks in the next cycle. At this time, sleeping is better than checking.

So I make the spinlock to be a swing-back one.

#[inline(always)]
pub fn check(seq: usize, at: &AtomicUsize, wc: &AtomicUsize) -> bool {

if wc.load(Relaxed) == 0 || seq == cur_count || past(seq, cur_count).1 {
true
} else {
false
}
}

This works exactly as expected. The cpu usage lowered and the through-put is able to pass all the tests.

# Futures

However, I then found that the CPU usage is high for future queues. That, there is a kind of hybrid lock used for future queues.

Therefore, I removed all the Spin before going to parking_lot::Mutex::new(VecDeque::new()). link

It increases the number of low level context switches very much but indeed lowed the CPU usage. By the nature of this heavy switch comparing to the light weight spinlock, the through-put of future queues becomes just 1/10 of the normal queue.

And this experiment also taught me the performance ratio between parking_lot mutexes and native spins.

# No Silver Bullet

I really would like to come up with an equation for people to set the try_spins precisely, but it is very complicated because it includes all the envirnment information like CPU frequency, number of consumers, rate of feeding, etc.

So I just left it to the user.


unsafe impl<RW: QueueRW<T>, T> Sync for MultiQueue<RW, T> {}
unsafe impl<RW: QueueRW<T>, T> Send for MultiQueue<RW, T> {}
unsafe impl<RW: QueueRW<T>, T: Send> Send for InnerSend<RW, T> {}
unsafe impl<RW: QueueRW<T>, T: Send> Send for InnerRecv<RW, T> {}
unsafe impl<RW: QueueRW<T>, T: Send> Send for FutInnerSend<RW, T> {}
unsafe impl<RW: QueueRW<T>, T: Send> Send for FutInnerRecv<RW, T> {}
unsafe impl<RW: QueueRW<T>, R, F: FnMut(&T) -> R, T> Send for FutInnerUniRecv<RW, R, F, T> {}

/// Usage: futures_multiqueue(capacity)
/// This is equivalent to futures_multiqueue_with(capacity,50,20).
pub fn futures_multiqueue<RW: QueueRW<T>, T>(
capacity: Index,
) -> (FutInnerSend<RW, T>, FutInnerRecv<RW, T>) {
let cons_arc = Arc::new(FutWait::new());
let prod_arc = Arc::new(FutWait::new());
let (tx, rx) = MultiQueue::new_internal(capacity, cons_arc.clone());
let ftx = FutInnerSend {
writer: tx,
wait: cons_arc.clone(),
prod_wait: prod_arc.clone(),
};
let rtx = FutInnerRecv {
}