Skip to content

Latest commit

 

History

History
370 lines (226 loc) · 11.3 KB

File metadata and controls

370 lines (226 loc) · 11.3 KB

算法平台2.1中的读取并发线程锁&进程锁的实现

源代码实现

#ifndef MYLOCK_H__ // 2017-11-15 16:13:57
#define MYLOCK_H__

/** @class CMyLock
 *  @brief 本锁既是线程锁也是进程锁,适用于一下4种情况
 *  1) 当一个(或多个)线程(或进程)正在写入数据时,其他任何线程(或进程)不能写入数据
    2) 当一个(或多个)线程(或进程)正在写入数据时,其他任何线程(或进程)不能读取数据
    3) 当一个(或多个)线程(或进程)正在读取数据时,其他任何线程(或进程)不能写入数据
    4) 当一个(或多个)线程(或进程)正在读取数据时,其他线程(或进程)也能够读取数据。
 */
class CMyLock
{
public:

    explicit CMyLock(const char * const strLockID = 0); // 构造函数

    virtual ~CMyLock();             // 析构函数

    void ReadStart();               // 读取开始,必须与ReadEnd成对使用

    void ReadEnd();                 // 读取结束,必须与ReadStart成对使用

    void WriteStart();              // 写入开始,必须与WriteEnd成对使用

    void WriteEnd();                // 写入结束,必须与WriteStart成对使用

private:

    void *  m_hMutex;               // 本类的成员变量进程锁

    void *  m_hFileMapping;         // 获取文件映射从而获取成员变量

    void *  m_hSemReaders;          // 所有读取者应该等待的信号量,该信号量由WriteEnd触发

    void *  m_hSemWriters;          // 所有写入者应该等待的信号量,该信号量由ReadEnd或WriteEnd触发

}; // CMyLock

#endif // MYLOCK_H__
#include "MyLock.h"
#include <Windows.h>
#include <assert.h>
#include <stdio.h>

typedef struct MY_FILE_MAPPING_INFO__
{
    int     m_nWaitingReaders;

    int     m_nWaitingWriters;

    int     m_nActiveReaders;

    int     m_nActiveWriters;

} MY_FILE_MAPPING_INFO;

#define MAX_FILE_MAPPING_LENGTH sizeof(MY_FILE_MAPPING_INFO)

CMyLock::CMyLock(const char * const strLockID)
{
    assert(strLockID);

    char strMutexName[MAX_PATH] = {0};

    char strFileMappingName[MAX_PATH] = {0};

    char strSemReadersName[MAX_PATH] = {0};

    char strSemWritersName[MAX_PATH] = {0};

    _snprintf_s(strMutexName, MAX_PATH, MAX_PATH - 1, "%s+Mutex");

    _snprintf_s(strFileMappingName, MAX_PATH, MAX_PATH - 1, "%s+FileMapping");

    _snprintf_s(strSemReadersName, MAX_PATH, MAX_PATH - 1, "%s+SemReaders");

    _snprintf_s(strSemWritersName, MAX_PATH, MAX_PATH - 1, "%s+SemWriters");

    m_hMutex            = CreateMutex(0, 0, strMutexName);   assert(m_hMutex);

    WaitForSingleObject(m_hMutex, INFINITE);

    m_hFileMapping      = CreateFileMapping(INVALID_HANDLE_VALUE, 
                                            0,
                                            PAGE_READWRITE,
                                            0,
                                            MAX_FILE_MAPPING_LENGTH,
                                            strFileMappingName); assert(m_hFileMapping);

    if (NO_ERROR == GetLastError())
    {
        void * hMapView = MapViewOfFile(m_hFileMapping, FILE_MAP_ALL_ACCESS, 0, 0, 0);  assert(hMapView);

        MY_FILE_MAPPING_INFO * pstFileMappingInfo = (MY_FILE_MAPPING_INFO *)hMapView;

        memset(pstFileMappingInfo, 0, sizeof(MY_FILE_MAPPING_INFO));

        UnmapViewOfFile(hMapView);
    }

    m_hSemReaders       = CreateSemaphore(NULL, 0, MAXLONG, strSemReadersName);    assert(m_hSemReaders);

    m_hSemWriters       = CreateSemaphore(NULL, 0, MAXLONG, strSemWritersName);   assert(m_hSemWriters);

    ReleaseMutex(m_hMutex);
}

CMyLock::~CMyLock()
{
    CloseHandle(m_hSemWriters);

    CloseHandle(m_hSemReaders);

    CloseHandle(m_hFileMapping);

    CloseHandle(m_hMutex);
}

void CMyLock::ReadStart()
{
    WaitForSingleObject(m_hMutex, INFINITE);

    void * hMapView = MapViewOfFile(m_hFileMapping, FILE_MAP_ALL_ACCESS, 0, 0, 0);  assert(hMapView);

    MY_FILE_MAPPING_INFO * pstFileMappingInfo = (MY_FILE_MAPPING_INFO *)hMapView;   assert(pstFileMappingInfo);

    bool bWait = (pstFileMappingInfo->m_nActiveWriters | pstFileMappingInfo->m_nWaitingWriters);

    if (bWait)  // 读比写的优先级低,所以由Waiting的Writers,那么Reader等待
    {
        pstFileMappingInfo->m_nWaitingReaders++;
    }
    else
    {
        pstFileMappingInfo->m_nActiveReaders++;
    }

    UnmapViewOfFile(hMapView);

    ReleaseMutex(m_hMutex);

    if (bWait)  // 等待WriteEnd释放m_hSemReaders信号量
    {
        WaitForSingleObject(m_hSemReaders, INFINITE);
    }
}

void CMyLock::ReadEnd()
{
    WaitForSingleObject(m_hMutex, INFINITE);

    void * hMapView = MapViewOfFile(m_hFileMapping, FILE_MAP_ALL_ACCESS, 0, 0, 0);  assert(hMapView);

    MY_FILE_MAPPING_INFO * pstFileMappingInfo = (MY_FILE_MAPPING_INFO *)hMapView;   assert(pstFileMappingInfo);

    pstFileMappingInfo->m_nActiveReaders--;

    void *  hTempSem            = NULL;
    int     nReleaseSemCount    = 0;
    if (0 == pstFileMappingInfo->m_nActiveReaders)  // 所有读取者读取动作完成,需要释放信号量让其他对象获得权限
    {
        if (0 < pstFileMappingInfo->m_nWaitingWriters)      // 写入者优先级高
        {
            pstFileMappingInfo->m_nWaitingWriters--;

            pstFileMappingInfo->m_nActiveWriters++;         // 相当于m_nActiveWriters = 1

            hTempSem            = m_hSemWriters;

            nReleaseSemCount    = 1;    // 一次释放一个写入信号量
        }
        else if (0 < pstFileMappingInfo->m_nWaitingReaders)
        {
            pstFileMappingInfo->m_nActiveReaders    = pstFileMappingInfo->m_nWaitingReaders;

            pstFileMappingInfo->m_nWaitingReaders   = 0;

            hTempSem            = m_hSemReaders;

            nReleaseSemCount    = pstFileMappingInfo->m_nActiveReaders; // 一次性释放所有读取信号量
        }
    }

    UnmapViewOfFile(hMapView);

    ReleaseMutex(m_hMutex);

    if (hTempSem)
    {
        ReleaseSemaphore(hTempSem, nReleaseSemCount, NULL);
    }
}

void CMyLock::WriteStart()
{
    WaitForSingleObject(m_hMutex, INFINITE);

    void * hMapView = MapViewOfFile(m_hFileMapping, FILE_MAP_ALL_ACCESS, 0, 0, 0);  assert(hMapView);

    MY_FILE_MAPPING_INFO * pstFileMappingInfo = (MY_FILE_MAPPING_INFO *)hMapView;   assert(pstFileMappingInfo);

    bool bWait = (pstFileMappingInfo->m_nActiveReaders | pstFileMappingInfo->m_nActiveWriters | pstFileMappingInfo->m_nWaitingWriters);

    if (bWait)  // 读比写的优先级低,所以不用管是否有Waiting状态的Readers
    {
        pstFileMappingInfo->m_nWaitingWriters++;
    }
    else
    {
        pstFileMappingInfo->m_nActiveWriters++;
    }

    UnmapViewOfFile(hMapView);

    ReleaseMutex(m_hMutex);

    if (bWait)  // 读比写的优先级低,所以不用管是否有Waiting状态的Readers
    {
        WaitForSingleObject(m_hSemWriters, INFINITE);
    }
}

void CMyLock::WriteEnd()
{
    WaitForSingleObject(m_hMutex, INFINITE);

    void * hMapView = MapViewOfFile(m_hFileMapping, FILE_MAP_ALL_ACCESS, 0, 0, 0);  assert(hMapView);

    MY_FILE_MAPPING_INFO * pstFileMappingInfo = (MY_FILE_MAPPING_INFO *)hMapView;   assert(pstFileMappingInfo);

    pstFileMappingInfo->m_nActiveWriters--;

    void *  hTempSem            = NULL;
    int     nReleaseSemCount    = 0;
    if (0 == pstFileMappingInfo->m_nActiveWriters)  // 本次写入完成,需要释放信号量让其他对象获得权限
    {
        if (0 < pstFileMappingInfo->m_nWaitingWriters)      // 写入者优先级高
        {
            pstFileMappingInfo->m_nWaitingWriters--;

            pstFileMappingInfo->m_nActiveWriters++;         // 相当于m_nActiveWriters = 1

            hTempSem            = m_hSemWriters;

            nReleaseSemCount    = 1;    // 一次释放一个写入信号量
        }
        else if (0 < pstFileMappingInfo->m_nWaitingReaders)
        {
            pstFileMappingInfo->m_nActiveReaders    = pstFileMappingInfo->m_nWaitingReaders;

            pstFileMappingInfo->m_nWaitingReaders   = 0;

            hTempSem            = m_hSemReaders;

            nReleaseSemCount    = pstFileMappingInfo->m_nActiveReaders; // 一次性释放所有读取信号量
        }
    }

    UnmapViewOfFile(hMapView);

    ReleaseMutex(m_hMutex);

    if (hTempSem)
    {
        ReleaseSemaphore(hTempSem, nReleaseSemCount, NULL);
    }
}

案例简述

算法平台2.1有三个特点:一,进程多(可达到30个以上);二,每个进程的线程多(可达到10个以上);三,多进程与多线程之间数据交互频繁(分布式数据通信)。

如此高并发的数据通信使得数据的安全性尤为重要,一个直观的解决办法就是将数据增加线程锁以及进程锁。

在算法平台2.0中采用的线程锁是关键代码段,采用的进程锁是内核对象互斥体。

但是这两种锁都有一个重要的缺陷,那就是影响并发的高效性,使得并发变成了一个轮询的同步效果而已。

亦即:

当一个线程(或进程)正在写入或读取数据时,其他任何线程(或进程)不能写入或读取数据

然而实际上,我们要在保证数据安全的前提下实现真正的读取并发(注意:非写入并发)。

比如:

1) 当一个(或多个)线程(或进程)正在写入数据时,其他任何线程(或进程)不能写入数据
2) 当一个(或多个)线程(或进程)正在写入数据时,其他任何线程(或进程)不能读取数据
3) 当一个(或多个)线程(或进程)正在读取数据时,其他任何线程(或进程)不能写入数据
4) 当一个(或多个)线程(或进程)正在读取数据时,其他任何线程(或进程)能够读取数据(*)

案例分析和解决过程

案例分析

算法平台示例:

将读取并发线程锁&进程锁效果再次声明一下:

1) 当一个(或多个)线程(或进程)正在写入数据时,其他任何线程(或进程)不能写入数据
2) 当一个(或多个)线程(或进程)正在写入数据时,其他任何线程(或进程)不能读取数据
3) 当一个(或多个)线程(或进程)正在读取数据时,其他任何线程(或进程)不能写入数据
4) 当一个(或多个)线程(或进程)正在读取数据时,其他任何线程(或进程)能够读取数据(*)

要想达此种效果,我们就不能简单地使用关键代码段或者互斥进程锁了。

我们需要在此基础上增加一些判断条件,将所有读写情况归纳为上述4种情况。

当遇到前三种情况时,我们必须让线程(或进程)等待,待数据安全后方可获取读取权限。

当遇到最后一种情况时,线程(或进程)无须等待,直接获取读取权限。

要达到这种等待效果,我们使用内核对象信号量实现。

解决过程

需求整理: !我的图片

类设计:

功能实现:

读取开始

读取结束

写入开始

写入结束

示例展示:

经验总结,预防措施和建议

从本文可导出的检查项

1. 进程锁的实现为原创,使用了较多的assert,并未对异常做过多保护,后续完善。
2. 后续可以将线程锁和进程锁合并为一个类,以构造函数的输入参数作为区分。