首页
社区
课程
招聘
[原创]一种多线程基于计数无锁实现
发表于: 2013-9-3 11:46 13517

[原创]一种多线程基于计数无锁实现

2013-9-3 11:46
13517

先申明下,该方案为我在实际编程中创造出来的,事先我没有在其中地方看到关于该方案的介绍。

     在多线程编程中,我们经常会遇到线程同步问题,这时候加锁就变得必不可少。但是锁的使用会或多或少带来某些性能上的下降。下面先介绍一个多线程编程中经常遇到的问题模型,然后实现一种无锁解决方案。

     问题模型:
     R:表示某种资源,线程A往R中存放资源,线程B从R中取出资源。

    先看看常规解决方法:
    线程A往R中存放资源
  1.获取锁(此处可能睡眠)。
  2.存入资源。
  3.修改资源计数。
  4.释放锁。

    线程B从R中取出资源
  1.获取锁(此处可能睡眠)。
  2.取出资源。
  3.修改资源计数。
  4.释放锁。

    下面针对该模型实现一种无锁的解决方案:
    首先定义一个数组ARRAY存在资源,假设数组的长度为L,然后再定义两个变量READ和WRITE。READ表示读计数,WRITE表示写计数。

    该方案的基本思想为:
    1.存入资源增加WRITE。
    2.读取资源增加READ。
    3.WRITE和READ都只增不减。
    4.判断ARRAY存在空余空间,WRITE - READ < L。
    5.判断ARRAY为空,WRITE = READ 。
    6.定位读位置READ%L,定位写位置WRITE%L。

实际操作流程为:
初始化READ和WRITE为0。

线程A往R中存放一个资源
  1.判断数组中的资源未满。
  2.存放资源到ARRAY[WRITE%L]
  3.增加WRITE。
  线程B往R中读取一个资源
  1.判断数组中的资源不为空。
  2.存放资源到ARRAY[READ%L]
  3.增加READ。

   可能大家已经看出,上面的实现存在一个严重的问题,就是越界的问题,下面讨论解决方案:

    1.越界后WRITE - READ需要保证正确。
    大家知道无符号数有一个特性,
    0x00000000-0xffffffff = 1;
    0x00000000-0xfffffffe = 2;
    只要把READ和WRITE定义成无符号数,就能保证WRITE - READ在越界后保证正确性。

   2.WRITE%L 和 READ%L在越界后的正确性,我们需要保证以下等式成立:
0xffffffff%L = L -1

      为了保证以上等式成立,可以将L设成2的n次方,对应32位整数,n的取值范围为0~31. 由于限定L为2的n次方,WRITE%L 和 READ%L可以写成WRITE&(L-1) 和 READ%L&(L-1).

    讨论:
     该无锁实现对多线程编程常用的模型提出一种无锁实现,但在使用中还需注意一下几点:
    1.为防止程序从高速缓存中取值,必须将变量READ和WRITE定义成volatile类型。
    2.该方案要求缓冲区的长度为2的n次方,取值可以为1,2,4,8,16,32,64……,大部分时候可以满足应用上的需求。
    3.该方案目前只适用于基于数组的缓冲区结构。
    4.该方案目前只适一个读者,一个写者的情形,如果存在多个读者,多个写者,需要分别对读者和写者进行加锁,但是使用该方案还是可以减少锁的力度。


[培训]内核驱动高级班,冲击BAT一流互联网大厂工作,每周日13:00-18:00直播授课

收藏
免费 5
支持
分享
最新回复 (10)
雪    币: 351
活跃值: (80)
能力值: ( LV9,RANK:210 )
在线值:
发帖
回帖
粉丝
2
Spin lock, Busying Waiting
2013-9-3 13:51
0
雪    币: 1283
活跃值: (46)
能力值: ( LV3,RANK:30 )
在线值:
发帖
回帖
粉丝
3
只能1个读一个写?如果有多个同时读,多个同时写的问题。不加锁,你这个貌似不行,实际上你这个设计就是类似于一个环形缓冲的模式,但是多个线程的情况下,不加锁只怕不行!
2013-9-3 14:10
0
雪    币: 191
活跃值: (55)
能力值: ( LV6,RANK:90 )
在线值:
发帖
回帖
粉丝
4
相对于环形缓冲:
读线程读取一个元素,需要修改count计数,修改count计数是需要加锁。
写线程写入一个元素,需要修改count计数,修改count计数是需要加锁。

该方法的最大优点是不加锁,不使用原子操作实现了count计数。

如果多线程同时读的话,是需要加锁的。多线程同时写,也是需要加锁的。
2013-9-3 15:36
0
雪    币: 101
活跃值: (10)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
mty
5
感觉楼主做了个无用功,或者说仅仅封装了个缓冲区而已。
原本是对m_data的读写,实际上转化成了对m_uWriteCount 和m_uReadCount 的读写,你有考虑过多线程对这两个变量的竞争吗?
2013-9-3 23:41
0
雪    币: 216
活跃值: (144)
能力值: ( LV10,RANK:160 )
在线值:
发帖
回帖
粉丝
6
樓上的就說的對了,不但是無用功,並且引發了新的臨界資源。。從一個臨界資源m_data變成了兩個臨界資源m_uWriteCount和m_uReadCount,lz應該微觀一點去看。不瞞你說,我大学时在上操作系統課的時候曾經想過一模一樣的東西。。

實際上就是分散臨界資源,當處理多個臨界資源的效率反而比處理一個臨界資源效率高(譬如分散后可以用Interlockedxxxx原子操作),那個時候就這樣設計;同樣道理,聚合臨界資源也是一樣。。但是解決的是同步效率問題,并不能解決臨界資源問題。。

說了一大堆,就一句話,如果你不想用Event-Wait這類會導致線程切換的同步方式的話,就必須得確保操作原子性,特別在內核編程中,不用"lock Instruction"這類原子操作,或者不搶佔一個CPU在執行指令,其他CPU執行無關代碼這一類的spinlock加鎖的話,無法確保真正多線程操作的原子性。。不管你怎麼設計。。
2013-9-4 22:50
0
雪    币: 191
活跃值: (55)
能力值: ( LV6,RANK:90 )
在线值:
发帖
回帖
粉丝
7
看了大家的评论,这种技术确实有诸多局限性,但是只要在实际中帮助解决一些问题,那他还是有用的。下面贴一个支持一个线程读,一个线程写的CycleBuffer,注意该CycleBuffer没有经过实际测试。
class ZwAsynCount
{
public:
	ZwAsynCount(unsigned uSize)	//uSize必须为2的n次方
	{
		m_uReadCount = 0;
		m_uWriteCount = 0;
		m_uSize = uSize;
	}
	int Write(int nLen)			//返回元素位置 -1表示读失败
	{
		int nRet = -1;

		if (GetFree() >= nLen)
		{
			nRet = m_uWriteCount&(m_uSize-1);
		}
		return nRet;
	}
	void AddWrite(int nCount = 1)
	{
		m_uWriteCount += nCount;
	}
	int GetUsed()
	{
		return m_uWriteCount - m_uReadCount;
	}
	int GetFree()
	{
		return m_uSize - GetUsed();
	}
	int Read(int nLen)		//返回元素位置 -1表示写失败
	{
		int nRet = -1;

		if (GetUsed() <= nLen)
		{
			nRet = m_uReadCount&(m_uSize-1);
		}

		return nRet;
	}
	void AddRead(int nCount = 1)
	{
		m_uReadCount+= nCount;
	}
private:
	unsigned m_uSize;
	volatile unsigned m_uReadCount;
	volatile unsigned m_uWriteCount;
};

class CCycleBuffer
{
public:
	CCycleBuffer(int nSize):m_asynCount(nSize)
	{
		m_pBuf = new char[nSize];  
		m_nTalLen = nSize;
	}
	int GetFreeSpace()
	{
		return m_asynCount.GetFree();
	}
	int GetLength()
	{
		return m_asynCount.GetUsed();
	}
	~CCycleBuffer()
	{
		if (m_pBuf)
		{
			delete m_pBuf;
		}
	}
	bool Write(char* buf, int count)
	{
		bool bRet = false;
		int  nPos = m_asynCount.Write(count);

		if (nPos >=0)
		{
			if (m_nTalLen - nPos >=  count)
			{
				memcpy(m_pBuf + nPos, buf, count);
			}
			else
			{
				memcpy(m_pBuf + nPos, buf, m_nTalLen - nPos);
				memcpy(m_pBuf, buf + m_nTalLen - nPos, count - (m_nTalLen - nPos));
			}

			m_asynCount.AddWrite(count);
			bRet = true;
		}

		return bRet;
	}
	bool Read(char* buf, int count)
	{
		bool bRet = false;
		int  nPos = m_asynCount.Read(count);

		if (nPos >=0)
		{
			if (m_nTalLen - nPos >=  count)
			{
				memcpy(buf,m_pBuf + nPos,count);
			}
			else
			{
				memcpy(buf,m_pBuf + nPos, m_nTalLen - nPos);
				memcpy(buf + m_nTalLen - nPos,m_pBuf, count - (m_nTalLen - nPos));
			}

			m_asynCount.AddRead(count);
			bRet = true;
		}

		return bRet;
	}
private:
	char	*m_pBuf;
	int		m_nTalLen;
	ZwAsynCount m_asynCount;
};
2013-9-5 13:46
0
雪    币: 63
活跃值: (17)
能力值: ( LV8,RANK:130 )
在线值:
发帖
回帖
粉丝
8
楼主的问题实质就是生产者、消费者模型
只不过要求无锁化,我这里提供一个思路:
把资源放到一个队列里面
Consumer从队列中pop出资源来使用,pop如果失败,表面当前没有资源,Sleep一段时间
Producer往队列中push资源。
由于只存在Push/Pop操作,因此可以使用单链表,单链表在X86体系是可以实现无锁的。WINDOWS提供了API InterlockedPushEntrySList InterlockedPopEntrySList可以直接使用。
这个思路可以满足多个线程同时读写。
代码我就不发了。
2013-9-5 14:06
0
雪    币: 185
活跃值: (25)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
9
这个就是lock free 。 相对来说是比较局限的。而且需要原子操作吧。
2013-10-10 15:28
0
雪    币: 264
活跃值: (10)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
10
这样吗??

/* Interlocked SList queue using keyed event signaling */
struct queue {
    SLIST_HEADER slist;
    // Note: Multiple queues can (and should) share a keyed event handle
    HANDLE keyed_event;
    // Initial value: 0
    // Prior to blocking, the queue_pop function increments this to 1, then
    // rechecks the queue. If it finds an item, it attempts to compxchg back to
    // 0; if this fails, then it's racing with a push, and has to block
    LONG block_flag;
};
void init_queue(queue *qPtr) {
    NtCreateKeyedEvent(&qPtr->keyed_event, -1, NULL, 0);
    InitializeSListHead(&qPtr->slist);
    qPtr->blocking = 0;
}
void queue_push(queue *qPtr, SLIST_ENTRY *entry) {
    InterlockedPushEntrySList(&qPtr->slist, entry);
    // Transition block flag 1 -> 0. If this succeeds (block flag was 1), we
    // have committed to a keyed-event handshake
    LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
    if (oldv) {
        NtReleaseKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
    }
}
SLIST_ENTRY *queue_pop(queue *qPtr) {
    SLIST_ENTRY *entry = InterlockedPopEntrySList(&qPtr->slist);
    if (entry)
        return entry; // fast path
    // Transition block flag 0 -> 1. We must recheck the queue after this point
    // in case we race with queue_push; however since ReleaseKeyedEvent
    // blocks until it is matched up with a wait, we must perform the wait if
    // queue_push sees us
    LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 1, 0);
    assert(oldv == 0);
    entry = InterlockedPopEntrySList(&qPtr->slist);
    if (entry) {
        // Try to abort
        oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
        if (oldv == 1)
            return entry; // nobody saw us, we can just exit with the value
    }
    // Either we don't have an entry, or we are forced to wait because
    // queue_push saw our block flag. So do the wait
    NtWaitForKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
    // block_flag has been reset by queue_push
    if (!entry)
        entry = InterlockedPopEntrySList(&qPtr->slist);
    assert(entry);
    return entry;
}
2013-10-10 17:13
0
雪    币: 200
活跃值: (38)
能力值: ( LV4,RANK:50 )
在线值:
发帖
回帖
粉丝
11
不仅是有局限性,而且在多线程下结果是不正确的。建议你看看无锁队列的实现,感觉这代码还是比较天真。
2013-10-11 02:32
0
游客
登录 | 注册 方可回帖
返回
//