当前位置:首页 > 编程笔记 > 正文
已解决

【服务器】thread线程模块

来自网友在路上 152852提问 提问时间:2023-11-08 09:27:29阅读次数: 52

最佳答案 问答题库528位专家为你答疑解惑

thread线程模块

以下是从sylar服务器中学的,对其的复习;

  • sylar的thread线程模块是基于pthread实现的

前置知识—>推荐阅读

Thread线程类

线程类,构造函数传入线程入口函数和线程名称,线程入口函数类型为void(),如果带参数,则需要用std::bind进行绑定。线程类构造之后线程即开始运行,构造函数在线程真正开始运行之后返回

/*** @brief 线程类*/
class Thread : Noncopyable {
public:/// 线程智能指针类型typedef std::shared_ptr<Thread> ptr;/*** @brief 构造函数* @param[in] cb 线程执行函数* @param[in] name 线程名称*/Thread(std::function<void()> cb, const std::string& name);/*** @brief 析构函数*/~Thread();/*** @brief 线程ID*/pid_t getId() const { return m_id;}/*** @brief 线程名称*/const std::string& getName() const { return m_name;}/*** @brief 等待线程执行完成*/void join();/*** @brief 获取当前的线程指针*/static Thread* GetThis();/*** @brief 获取当前的线程名称*/static const std::string& GetName();/*** @brief 设置当前线程名称* @param[in] name 线程名称*/static void SetName(const std::string& name);
private:/*** @brief 线程执行函数*/static void* run(void* arg);
private:/// 线程idpid_t m_id = -1;/// 线程结构pthread_t m_thread = 0;/// 线程执行函数std::function<void()> m_cb;/// 线程名称std::string m_name;/// 信号量Semaphore m_semaphore;
};

提供两个线程局部变量,可通过Get、Set方法对其进行设置和获取;

static thread_local Thread* t_thread = nullptr;
static thread_local std::string t_thread_name = "UNKNOW";static sylar::Logger::ptr g_logger = SYLAR_LOG_NAME("system");Thread* Thread::GetThis() {return t_thread;
}const std::string& Thread::GetName() {return t_thread_name;
}void Thread::SetName(const std::string& name) {if(name.empty()) {return;}if(t_thread) {t_thread->m_name = name;}t_thread_name = name;
}Thread::Thread(std::function<void()> cb, const std::string& name):m_cb(cb),m_name(name) {if(name.empty()) {m_name = "UNKNOW";}int rt = pthread_create(&m_thread, nullptr, &Thread::run, this);if(rt) {SYLAR_LOG_ERROR(g_logger) << "pthread_create thread fail, rt=" << rt<< " name=" << name;throw std::logic_error("pthread_create error");}m_semaphore.wait();
}Thread::~Thread() {if(m_thread) {pthread_detach(m_thread);}
}void Thread::join() {if(m_thread) {int rt = pthread_join(m_thread, nullptr);if(rt) {SYLAR_LOG_ERROR(g_logger) << "pthread_join thread fail, rt=" << rt<< " name=" << m_name;throw std::logic_error("pthread_join error");}m_thread = 0;}
}void* Thread::run(void* arg) {Thread* thread = (Thread*)arg;t_thread = thread;t_thread_name = thread->m_name;thread->m_id = sylar::GetThreadId();pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str());std::function<void()> cb;cb.swap(thread->m_cb);thread->m_semaphore.notify();cb();return 0;
}

线程互斥

Mutex: 互斥锁,基于pthread_mutex_t实现
RWMutex: 读写锁,基于pthread_rwlock_t实现
Spinlock: 自旋锁,基于pthread_spinlock_t实现
CASLock: 原子锁,基于std::atomic_flag实现

  • 以下对锁的封装能够使用类的构造函数来加锁,析构函数进行释放锁

互斥锁Mutex

/*** @brief 互斥量*/
class Mutex : Noncopyable {
public: /// 局部锁typedef ScopedLockImpl<Mutex> Lock;/*** @brief 构造函数*/Mutex() {pthread_mutex_init(&m_mutex, nullptr);}/*** @brief 析构函数*/~Mutex() {pthread_mutex_destroy(&m_mutex);}/*** @brief 加锁*/void lock() {pthread_mutex_lock(&m_mutex);}/*** @brief 解锁*/void unlock() {pthread_mutex_unlock(&m_mutex);}pthread_mutex_t getMutex() const { return m_mutex; }
private:/// mutexpthread_mutex_t m_mutex;
};

读写锁RWMutex

  • 读操作可并发重入,写操作是互斥的
/*** @brief 读写互斥量*/
class RWMutex : Noncopyable{
public:/// 局部读锁typedef ReadScopedLockImpl<RWMutex> ReadLock;/// 局部写锁typedef WriteScopedLockImpl<RWMutex> WriteLock;/*** @brief 构造函数*/RWMutex() {pthread_rwlock_init(&m_lock, nullptr);}/*** @brief 析构函数*/~RWMutex() {pthread_rwlock_destroy(&m_lock);}/*** @brief 上读锁*/void rdlock() {pthread_rwlock_rdlock(&m_lock);}/*** @brief 上写锁*/void wrlock() {pthread_rwlock_wrlock(&m_lock);}/*** @brief 解锁*/void unlock() {pthread_rwlock_unlock(&m_lock);}private:/// 读写锁pthread_rwlock_t m_lock;
};

局部锁

/*** @brief 局部锁的模板实现*/
template<class T>
struct ScopedLockImpl {
public:/*** @brief 构造函数* @param[in] mutex Mutex*/ScopedLockImpl(T& mutex):m_mutex(mutex) {m_mutex.lock();m_locked = true;}/*** @brief 析构函数,自动释放锁*/~ScopedLockImpl() {unlock();}/*** @brief 加锁*/void lock() {if(!m_locked) {m_mutex.lock();m_locked = true;}}/*** @brief 解锁*/void unlock() {if(m_locked) {m_mutex.unlock();m_locked = false;}}
private:/// mutexT& m_mutex;/// 是否已上锁bool m_locked;
};/*** @brief 局部读锁模板实现*/
template<class T>
struct ReadScopedLockImpl {
public:/*** @brief 构造函数* @param[in] mutex 读写锁*/ReadScopedLockImpl(T& mutex):m_mutex(mutex) {m_mutex.rdlock();m_locked = true;}/*** @brief 析构函数,自动释放锁*/~ReadScopedLockImpl() {unlock();}/*** @brief 上读锁*/void lock() {if(!m_locked) {m_mutex.rdlock();m_locked = true;}}/*** @brief 释放锁*/void unlock() {if(m_locked) {m_mutex.unlock();m_locked = false;}}
private:/// mutexT& m_mutex;/// 是否已上锁bool m_locked;
};/*** @brief 局部写锁模板实现*/
template<class T>
struct WriteScopedLockImpl {
public:/*** @brief 构造函数* @param[in] mutex 读写锁*/WriteScopedLockImpl(T& mutex):m_mutex(mutex) {m_mutex.wrlock();m_locked = true;}/*** @brief 析构函数*/~WriteScopedLockImpl() {unlock();}/*** @brief 上写锁*/void lock() {if(!m_locked) {m_mutex.wrlock();m_locked = true;}}/*** @brief 解锁*/void unlock() {if(m_locked) {m_mutex.unlock();m_locked = false;}}
private:/// MutexT& m_mutex;/// 是否已上锁bool m_locked;
};

自旋锁Spinlock

  • 线程反复检查锁变量是否可用。由于线程在这一过程中保持执行,因此是一种忙等
    待。一旦获取了自旋锁,线程会一直保持该锁,直至显式释放自旋锁

使用:

  • 临界区持锁时间非常短且CPU资源不紧张的情况下;
/*** @brief 自旋锁*/
class Spinlock : Noncopyable {
public:/// 局部锁typedef ScopedLockImpl<Spinlock> Lock;/*** @brief 构造函数*/Spinlock() {pthread_spin_init(&m_mutex, 0);}/*** @brief 析构函数*/~Spinlock() {pthread_spin_destroy(&m_mutex);}/*** @brief 上锁*/void lock() {pthread_spin_lock(&m_mutex);}/*** @brief 解锁*/void unlock() {pthread_spin_unlock(&m_mutex);}
private:/// 自旋锁pthread_spinlock_t m_mutex;
};

原子锁CASLock

  • 对于简单的变量使用,当前指令没有执行完,不会让其他线程执行;
/*** @brief 原子锁*/
class CASLock : Noncopyable {
public:/// 局部锁typedef ScopedLockImpl<CASLock> Lock;/*** @brief 构造函数*/CASLock() {m_mutex.clear();}/*** @brief 析构函数*/~CASLock() {}/*** @brief 上锁*/void lock() {while(std::atomic_flag_test_and_set_explicit(&m_mutex, std::memory_order_acquire));}/*** @brief 解锁*/void unlock() {std::atomic_flag_clear_explicit(&m_mutex, std::memory_order_release);}
private:/// 原子状态volatile std::atomic_flag m_mutex;
};

线程通信

Semaphore: 计数信号量,基于sem_t实现

信号量Semaphore

/*** @brief 信号量*/
class Semaphore : Noncopyable {
public:/*** @brief 构造函数* @param[in] count 信号量值的大小*/Semaphore(uint32_t count = 0);/*** @brief 析构函数*/~Semaphore();/*** @brief 获取信号量*/void wait();/*** @brief 释放信号量*/void notify();
private:sem_t m_semaphore;
};/** 信号量封装使用条件变量和锁 */
class OwnSemaphore : Noncopyable {
public:typedef Mutex MutexType;OwnSemaphore(size_t count=0);~OwnSemaphore();void wait();void notify();size_t getCount() const { return m_count; }void reset() { m_count = 0;}private:size_t m_count;MutexType m_mutex;Cond m_cond;
};Semaphore::Semaphore(uint32_t count) {if(sem_init(&m_semaphore, 0, count)) {throw std::logic_error("sem_init error");}
}Semaphore::~Semaphore() {sem_destroy(&m_semaphore);
}void Semaphore::wait() {if(sem_wait(&m_semaphore)) {throw std::logic_error("sem_wait error");}
}void Semaphore::notify() {if(sem_post(&m_semaphore)) {throw std::logic_error("sem_post error");}
}OwnSemaphore::OwnSemaphore(size_t count):m_count(count){
}OwnSemaphore::~OwnSemaphore() {
}void OwnSemaphore::wait() {MutexType::Lock lock(m_mutex);while (m_count == 0) {m_cond.wait(m_mutex.getMutex());}--m_count;
}void OwnSemaphore::notify() {m_count++;m_cond.signal();
}

条件变量 Cond

/** 封装条件变量 */
class Cond {
public:Cond();void wait(pthread_mutex_t mutex);void signal();~Cond();private:pthread_cond_t m_cond;
};Cond::Cond() {pthread_cond_init(&m_cond, NULL);
}void Cond::wait(pthread_mutex_t mutex) {pthread_cond_wait(&m_cond, &mutex);
}void Cond::signal() {pthread_cond_signal(&m_cond);
}Cond::~Cond() {pthread_cond_destroy(&m_cond);
}

查看全文

99%的人还看了

猜你感兴趣

版权申明

本文"【服务器】thread线程模块":http://eshow365.cn/6-35220-0.html 内容来自互联网,请自行判断内容的正确性。如有侵权请联系我们,立即删除!