muduo源码剖析之AsyncLogging异步日志类
最佳答案 问答题库528位专家为你答疑解惑
简介
AsyncLogging是muduo的日志,程序如果直接让文件写日志可能会发生阻塞,muduo前端设计了2个BufferPtr,分别是currentBuffer_和nextBuffer_,还有一个存放BufferPtr的vector(buffers_)。
多个前端线程往currentBuffer_写数据,currentBuffer_写满了将其放入buffers_,通知后端线程读。前端线程将currentBuffer_和nextBuffer_替换继续写currentBuffer_。
后端也有2个BufferPtr,分别为newBuffer1和newBuffer2,还有一个BufferVector(buffersToWrite)。后端线程在收到前端通知之后,利用buffersToWrite和buffers_进行交换,并且用newBuffer1和newBuffer2归还给前端的currentBuffer_和nextBuffer_,然后把日志写入文件。
muduo日志文件只提供写入本地文件
后端线程写入条件:
- 前端线程缓冲区写完后通过条件变量通知后端线程写入
- 超时,muduo设置的是默认时间是3秒(AsyncLogging构造函数第三个参数flushInterval_),
源码剖析
AsyncLogging.h
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)#ifndef MUDUO_BASE_ASYNCLOGGING_H
#define MUDUO_BASE_ASYNCLOGGING_H#include "muduo/base/BlockingQueue.h"
#include "muduo/base/BoundedBlockingQueue.h"
#include "muduo/base/CountDownLatch.h"
#include "muduo/base/Mutex.h"
#include "muduo/base/Thread.h"
#include "muduo/base/LogStream.h"#include <atomic>
#include <vector>namespace muduo
{class AsyncLogging : noncopyable
{public:AsyncLogging(const string& basename,off_t rollSize,int flushInterval = 3);~AsyncLogging(){if (running_){stop();}}void append(const char* logline, int len);void start(){running_ = true;thread_.start();latch_.wait();//保证后端线程已经开始运行}void stop() NO_THREAD_SAFETY_ANALYSIS{running_ = false;cond_.notify();thread_.join();}private:void threadFunc();typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer;typedef std::vector<std::unique_ptr<Buffer>> BufferVector;typedef BufferVector::value_type BufferPtr;//存储超时时间变量const int flushInterval_; //后端线程启动标志std::atomic<bool> running_;//日志文件名const string basename_;//预留的日志大小const off_t rollSize_;//调用后端写入线程muduo::Thread thread_;//该变量保证后端日志写入线程已经运行muduo::CountDownLatch latch_;//条件变量与互斥锁,用来保证前端线程与后端线程的线程同步muduo::MutexLock mutex_;muduo::Condition cond_ GUARDED_BY(mutex_);//前端线程当前写入缓冲区BufferPtr currentBuffer_ GUARDED_BY(mutex_);//前端线程下一个备用缓冲区BufferPtr nextBuffer_ GUARDED_BY(mutex_);//带写入文件已填满的缓冲区队列BufferVector buffers_ GUARDED_BY(mutex_);
};} // namespace muduo#endif // MUDUO_BASE_ASYNCLOGGING_H
AsyncLogging.cc
前端线程
前端在生成一条日志消息的时候会调用AsyncLogging::append()。在这个函数中,如果当前缓冲(currentBuff_)剩余的空间足够大,则会直接把日志消息拷贝(追加)到当前缓冲中,这是最常见的情况。
这里拷贝一条日志消息并不会带来多大开销。前后端代码的其余部分都没有拷贝,而是简单的指针交换。否则,说明当前缓冲已经写满,就把它送入(移入)buffers,并试图把预备好的另一块缓冲(nextBuffer_)移用(move)为当前缓冲,然后追加日志消息并通知(唤醒)后端开始写入日志数据。
以上两种情况在临界区之内都没有耗时的操作,运行时间为常数。
如果前端写入速度太快,一下子把两块缓冲都用完了,那么只好分配一块新的buffer,作为当前缓冲,这是极少发生的情况
后端线程
首先准备好两块空闲的buffer,以备在临界区内交换。
在临界区内,等待条件触发,这里的条件有两个:其一是超时,其二是前端写满了一个或多个buffer。
注意这里是非常规的conditionvariable用法,它没有使用while循环,而且等待时间有上限。当“条件”满足时,先将当前缓冲(currentBuffe_)移入buffers_,并立刻将空闲的newBuffer1移为当前缓冲。
注意这整段代码位于临界区之内,因此不会有任何race condition。接下来将buffer_与buffersToWrite交换,后面的代码可以在临界区之外安全地访问buffersToWrite,将其中的日志数据写入文件。
临界区里最后干的一件事情是用newBuffer2替换nextBuffer,这样前端始终有一个预备buffer可供调配。nextBuffer_可以减少前端临界区分配内存的概率,缩短前端临界区长度。注意到后端临界区内也没有耗时的操作,运行时间为常数。会buffersToWrite内的buffer重新填充newBuffer1和newBuffer2,这样下一次执行的时候还有两个空闲buffer可用于替换前端的当前缓冲和预备缓冲。
最后,这四个缓冲在程序启动的时候会全部填充为0,这样可以避免程序热身时page fault引发性能不稳定。
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)#include "muduo/base/AsyncLogging.h"
#include "muduo/base/LogFile.h"
#include "muduo/base/Timestamp.h"#include <stdio.h>using namespace muduo;//异步日志
AsyncLogging::AsyncLogging(const string& basename,off_t rollSize,int flushInterval): flushInterval_(flushInterval),//后端线程超时时间变量,默认为3srunning_(false),//后端线程启动标志basename_(basename),//设置输出日志文件名rollSize_(rollSize),// 预留的日志大小thread_(std::bind(&AsyncLogging::threadFunc, this), "Logging"),// 执行该异步日志记录器的线程latch_(1),//该变量作用是保证后端线程已进入mutex_(),cond_(mutex_),currentBuffer_(new Buffer), //当前缓冲区nextBuffer_(new Buffer),//预备缓冲区buffers_()//缓冲区队列
{currentBuffer_->bzero(); //清空nextBuffer_->bzero();//清空buffers_.reserve(16);//设置缓冲区队列大小为16
}//所有LOG_*最终都会调用append函数
void AsyncLogging::append(const char* logline, int len)
{muduo::MutexLockGuard lock(mutex_);if (currentBuffer_->avail() > len) // 如果当前buffer还有空间,就添加到当前日志{currentBuffer_->append(logline, len);//调用vector的append}else{//将使用完后的buffer添加到 buffer vector后buffers_.push_back(std::move(currentBuffer_));if (nextBuffer_) // 重新设置当前buffer{currentBuffer_ = std::move(nextBuffer_);}else{currentBuffer_.reset(new Buffer); // Rarely happens//如果前端写入速度太快了,一下子把两块缓冲都用完了,那么只好分配一块新的buffer,作当前缓冲,这是极少发生的情况}currentBuffer_->append(logline, len);// 通知日志线程,有数据可写cond_.notify();}
}void AsyncLogging::threadFunc() // 线程调用的函数,主要用于周期性的flush数据到日志文件中
{assert(running_ == true);latch_.countDown();LogFile output(basename_, rollSize_, false);//打开日志文件BufferPtr newBuffer1(new Buffer); //这两个是后台线程的buffer BufferPtr newBuffer2(new Buffer);newBuffer1->bzero();//clearnewBuffer2->bzero();//clearBufferVector buffersToWrite; //用来和前台线程的buffers_进行swapbuffersToWrite.reserve(16); //预留空间while (running_){assert(newBuffer1 && newBuffer1->length() == 0);assert(newBuffer2 && newBuffer2->length() == 0);assert(buffersToWrite.empty());{muduo::MutexLockGuard lock(mutex_);//如果buffer为空,那么表示没有数据需要写入文件,那么就等待指定的时间(默认三秒)if (buffers_.empty()) // unusual usage!{cond_.waitForSeconds(flushInterval_);}//无论cond是因何而醒来,都要将currentBuffer_放到buffers_中。 //如果是因为时间到而醒,那么currentBuffer_还没满,此时也要将之写入LogFile中。 //如果已经有一个前台buffer满了,那么在前台线程中就已经把一个前台buffer放到buffers_中 //了。此时,还是需要把currentBuffer_放到buffers_中(注意,前后放置是不同的buffer, //因为在前台线程中,currentBuffer_已经被换成nextBuffer_指向的buffer了)buffers_.push_back(std::move(currentBuffer_)); //currentBuffer_是当前缓冲区/*---归还一个buffer---*/ // 将新的buffer转成当前缓冲区currentBuffer_ = std::move(newBuffer1);//使用新的未使用的 buffersToWrite 交换 buffers_,将buffers_中的数据在异步线程中写入LogFile中buffersToWrite.swap(buffers_);//内部指针交换,而非复制if (!nextBuffer_){nextBuffer_ = std::move(newBuffer2);/*-----假如需要,归还第二个----*/}}assert(!buffersToWrite.empty());// 如果将要写入文件的buffer列表中buffer的个数大于25,那么将多余数据删除 // 消息堆积//前端陷入死循环,拼命发送日志消息,超过后端的处理能力//这是典型的生产速度超过消费速度,会造成数据在内存中的堆积//严重时引发性能问题(可用内存不足),//或程序崩溃(分配内存失败)if (buffersToWrite.size() > 25){char buf[256];snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",Timestamp::now().toFormattedString().c_str(),buffersToWrite.size()-2);fputs(buf, stderr);output.append(buf, static_cast<int>(strlen(buf)));// 丢掉多余日志,以腾出内存,仅保留两块缓冲区buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());}// 将buffersToWrite的数据写入到日志中for (const auto& buffer : buffersToWrite){// FIXME: use unbuffered stdio FILE ? or use ::writev ?output.append(buffer->data(), buffer->length());}// 重新调整buffersToWrite的大小 if (buffersToWrite.size() > 2){// drop non-bzero-ed buffers, avoid trashingbuffersToWrite.resize(2);}if (!newBuffer1){assert(!buffersToWrite.empty());// 从buffersToWrite中弹出一个作为newBuffer1newBuffer1 = std::move(buffersToWrite.back());buffersToWrite.pop_back();newBuffer1->reset();// 清理newBuffer1}//前台buffer是由newBuffer1 2 归还的。现在把buffersToWrite的buffer归还给后台bufferif (!newBuffer2){assert(!buffersToWrite.empty());newBuffer2 = std::move(buffersToWrite.back());buffersToWrite.pop_back();newBuffer2->reset();}buffersToWrite.clear();output.flush();//刷新日志文件(写入)}output.flush();
}
如果日志消息堆积怎么办
万一前端陷入死循环,拼命发送日志消息,超过后端的处理(输出)能力,会导致什么后果?对于同步日志来说,这不是问题,因为阻塞IO自然就限制了前端的写入速度,起到了节流阀的作用。但是对于异步日志来说,这就是典型的生产速度高于消费速度问题,会造成数据在内存中堆积,严重时引发性能问题(可用内存不足)或程序崩溃(分配内存失败)。
muduo日志库处理日志堆积的方法很简单:直接丢掉多余的日志buffer,以腾出内存,见这样可以防止日志库本身引起程序故障,是一种自我保护措施。
99%的人还看了
相似问题
猜你感兴趣
版权申明
本文"muduo源码剖析之AsyncLogging异步日志类":http://eshow365.cn/6-23723-0.html 内容来自互联网,请自行判断内容的正确性。如有侵权请联系我们,立即删除!
- 上一篇: 解决jsonp跨域中的安全漏洞(包含meta解释)
- 下一篇: Leetcode链表问题汇总