【 OpenGauss源码学习 —— 列存储(CUStorage)】
最佳答案 问答题库738位专家为你答疑解惑
列存储(CUStorage)
- 概述
- CUStorage::SaveCU 函数
- CUStorage::GetFileName 函数
- CUStorage::OverwriteCU 函数
- CUStorage::OverwriteCU 函数
- CUStorage::RemoteLoadCU 函数
- CUStorage::Load 函数
- CUStorage::WSLoad 函数
- CUStorage::TruncateDataFile 函数
- CUStorage::IsDataFileExist 函数
- CUStorage::GetBcmFileName 函数
- CUStorage::TruncateBcmFile 函数
- CUStorage::IsBcmFileExist 函数
- 总结
声明:本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。
本文主要参考了 OpenGauss1.1.0 的开源代码和《OpenGauss数据库源码解析》一书以及OpenGauss社区学习文档和一些参考资料
概述
在学习完 CU 和 CStore 后,我们紧接着来了解一下 CUStorage 类。通常,CStore 类作为整个列式存储引擎的核心,通过管理 CU 和 CUDesc 来实现对列式存储数据的存储、检索和操作。CUStorage 类可能提供了底层的物理存储和读写操作。
CUStorage 类是数据库内核中与列存储(Column Store)相关的实现之一。以下是该类的主要职责和功能:
- 存储管理: 通过 SaveCU 和 OverwriteCU 方法,将列存储(CU,Compression Unit)数据写入存储文件。可以选择直接写入或用于扩展。
- 加载数据: 通过 LoadCU 和 RemoteLoadCU 方法,从存储中加载列存储数据。可以选择直接加载或用于缓存。
- 文件操作: 包括创建文件、打开文件、关闭文件等操作,通过 CreateFile、OpenFile、CloseFile 等私有成员函数实现。
- 空间分配: 使用 CStoreFreeSpace 类来管理存储中的空闲空间,通过 InitCstoreFreeSpace 和其他相关方法初始化和管理。
- 文件信息获取: 通过 GetFileName、GetBcmFileName、IsDataFileExist、IsBcmFileExist 等方法获取列文件的名称和相关信息。
- 其他功能: 包括分配空间、刷新文件、获取文件描述符、设置分配策略等。
以下为 CUStorage 类的函数源码:(路径:src/include/storage/custorage.h
)
class CUStorage : public BaseObject {
public:// 构造函数,初始化 CUStorage 对象CUStorage(const CFileNode& cFileNode, CStoreAllocateStrategy strategy = APPEND_ONLY);// 析构函数,释放 CUStorage 对象占用的资源virtual ~CUStorage();// 销毁函数,用于在析构对象时执行额外的清理操作virtual void Destroy();// 声明 CUFile 为友元类,使其可以访问 CUStorage 的私有成员friend class CUFile;// 将 CU 数据写入存储void SaveCU(_in_ char* write_buf, _in_ uint64 offset, _in_ int size, bool direct_flag, bool for_extension = false);// 重写 CU 数据到存储,通常用于扩展操作void OverwriteCU(_in_ char* write_buf, _in_ uint64 offset, _in_ int size, bool direct_flag, bool for_extension = false);// 从存储加载 CU 数据void LoadCU(_in_ CU* cuPtr, _in_ uint64 offset, _in_ int size, bool direct_flag, bool inCUCache);// 远程加载 CU 数据,通常用于分布式存储void RemoteLoadCU(_in_ CU* cuPtr, _in_ uint64 offset, _in_ int size, bool direct_flag, bool inCUCache);// 从文件加载数据到缓冲区void Load(_in_ uint64 offset, _in_ int size, __inout char* outbuf, bool direct_flag);// 在写时加载数据到缓冲区int WSLoad(_in_ uint64 offset, _in_ int size, __inout char* outbuf, bool direct_flag);// 获取文件名void GetFileName(_out_ char* fileName, _in_ const size_t capacity, _in_ const int fileId) const;// 检查数据文件是否存在bool IsDataFileExist(int fileId) const;// 获取 BCM 文件名void GetBcmFileName(_out_ char* bcmfile, _in_ int fileId) const;// 检查 BCM 文件是否存在bool IsBcmFileExist(_in_ int fileId) const;// 获取列文件名const char* GetColumnFileName() const;// 分配指定大小的空间uint64 AllocSpace(_in_ int size);// 刷新数据文件void FlushDataFile() const;// 设置分配策略void SetAllocateStrategy(CStoreAllocateStrategy strategy){m_strategy = strategy;};// 设置自由空间管理器void SetFreeSpace(CStoreFreeSpace* fspace){Assert(fspace != NULL);m_freespace = fspace;};// 获取自由空间管理器FORCE_INLINE CStoreFreeSpace* GetFreeSpace(){return m_freespace;};// 创建存储void CreateStorage(int fileId, bool isRedo) const;// 获取 CU 文件的文件描述符File GetCUFileFd(uint64 offset);// 获取 CU 在文件中的偏移量uint64 GetCUOffsetInFile(uint64 offset) const;// 检查 CU 是否存储在同一文件中bool IsCUStoreInOneFile(uint64 offset, int size) const;// 获取对齐的 CU 偏移量uint64 GetAlignCUOffset(uint64 offset) const;// 获取对齐的 CU 大小int GetAlignCUSize(int size) const;// 快速扩展文件大小void FastExtendFile(uint64 extend_offset, uint32 size, bool keep_size);// 截断数据文件void TruncateDataFile();// 截断 BCM 文件void TruncateBcmFile();// 设置是否启用 2 字节对齐void Set2ByteAlign(bool is_2byte_align);// 检查是否启用 2 字节对齐bool Is2ByteAlign();private:// 初始化文件名前缀void InitFileNamePrefix(_in_ const CFileNode& cFileNode);// 创建文件File CreateFile(_in_ char* file_name, _in_ int fileId, bool isRedo) const;// 打开文件File OpenFile(_in_ char* file_name, _in_ int fileId, bool direct_flag);// 在写时打开文件File WSOpenFile(_in_ char* file_name, _in_ int fileId, bool direct_flag);// 初始化自由空间管理器void InitCstoreFreeSpace(CStoreAllocateStrategy strategy);// 关闭文件void CloseFile(_in_ File fd) const;public:// 存储文件节点信息CFileNode m_cnode;private:// 列文件名的通用前缀char m_fileNamePrefix[MAXPGPATH];// 列文件名char m_fileName[MAXPGPATH];// 自由空间管理器CStoreFreeSpace* m_freespace;// 当前读写文件描述符File m_fd;// 分配策略:追加、重用CStoreAllocateStrategy m_strategy;// 追加模式标志bool append_only;// 是否启用 2 字节对齐bool is_2byte_align;
};
本文将首先围绕 CUStorage 类中的部分成员函数展开学习。
CUStorage::SaveCU 函数
CUStorage::SaveCU 函数的主要作用是将指定的 CU 数据保存到存储中,支持大文件存储,通过循环写入实现对超过单个文件大小限制的 CU 数据的存储。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp
)
/** 将 CU 数据保存到存储中,支持大文件存储。* 参数:* - write_buf: 待写入的数据缓冲区指针* - offset: 写入的起始偏移量* - size: 待写入的数据大小* - direct_flag: 是否使用直接 I/O,通常用于绕过系统缓存* - for_extension: 是否为扩展操作,用于判断是否刷新数据文件*/
void CUStorage::SaveCU(char* write_buf, _in_ uint64 offset, _in_ int size, bool direct_flag, bool for_extension)
{// 根据偏移量计算写入的文件 ID 和在文件内的偏移量int writeFileId = offset / MAX_FILE_SIZE;uint64 writeOffset = offset % MAX_FILE_SIZE;// 计算当前写入的大小,不超过文件剩余大小int write_size = std::min(size, (int)(MAX_FILE_SIZE - writeOffset));int left_size = size - write_size;// 获取表空间 OIDOid tableSpaceOid = m_cnode.m_rnode.spcNode;// 临时文件名缓冲区char tmpFileName[MAXPGPATH] = {0};errno_t rc = 0;// 如果是追加模式,检查表空间使用是否超过最大值if (append_only)TableSpaceUsageManager::IsExceedMaxsize(tableSpaceOid, size);// 循环写入数据,直至全部数据写入完成while (write_size > 0) {// 获取当前写入的文件名GetFileName(tmpFileName, MAXPGPATH, writeFileId);// 如果文件名发生变化,关闭之前的文件并打开新的文件if (strcmp(tmpFileName, m_fileName) != 0) {if (m_fd != FILE_INVALID) {/** 如果切换数据文件,刷新数据。在文件扩展期间不执行此操作,因为很快就会 fsync 实际数据。*/if (!for_extension)FlushDataFile();FileClose(m_fd);}// 打开新文件,并更新当前文件名m_fd = OpenFile(tmpFileName, writeFileId, direct_flag);Assert(m_fd != FILE_INVALID);rc = strcpy_s(m_fileName, MAXPGPATH, tmpFileName);securec_check_c(rc, "\0", "\0");}Assert(m_fd != FILE_INVALID);// 将数据写入文件int writtenBytes = FilePWrite(m_fd, write_buf, write_size, writeOffset);// 检查写入是否成功if (writtenBytes != write_size) {int align_size = is_2byte_align ? ALIGNOF_TIMESERIES_CUSIZE : ALIGNOF_CUSIZE;// 报告 I/O 错误SaveCUReportIOError(tmpFileName, writeOffset, writtenBytes, write_size, size, align_size);}// 更新文件 ID、偏移量和数据缓冲区指针++writeFileId;writeOffset = 0;write_buf += write_size;// 计算下一轮写入的大小write_size = (((unsigned int)left_size > MAX_FILE_SIZE) ? MAX_FILE_SIZE : left_size);left_size -= write_size;}// 检查是否所有数据均已写入if (left_size != 0) {ereport(ERROR, (errcode_for_file_access(),errmsg("write file \"%s\" failed in savecu!", tmpFileName)));}
}
CUStorage::GetFileName 函数
CUStorage::GetFileName 函数用于获取 CU 文件的文件名。它将文件名构造为以给定前缀 m_fileNamePrefix 开头,后跟文件 ID 的形式。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp
)
void CUStorage::GetFileName(_out_ char* fileName, _in_ const size_t capacity, _in_ const int fileId) const
{Assert(fileId >= 0); // 断言文件ID应为非负数// 表示一个关系的一个列的CU文件。// 与bcm文件名不同,其文件列表为:// 16385_C1.0 16385_C1.1 16385_C1.2 ...int rc = snprintf_s(fileName, capacity, capacity - 1, "%s.%d", m_fileNamePrefix, fileId);securec_check_ss(rc, "", "");fileName[capacity - 1] = '\0'; // 确保文件名以 null 结尾
}
CUStorage::OverwriteCU 函数
此函数的主要作用是在远程读取 CU 数据后,需要覆盖本地 CU 数据。它通过循环写入实现对超过单个文件大小限制的 CU 数据的覆盖。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp
)
/** 在远程读取 CU 数据后,需要覆盖本地 CU 数据。* 参数:* - write_buf: 待写入的数据缓冲区指针* - offset: 写入的起始偏移量* - size: 待写入的数据大小* - direct_flag: 是否使用直接 I/O,通常用于绕过系统缓存* - for_extension: 是否为扩展操作,用于判断是否刷新数据文件*/
void CUStorage::OverwriteCU(_in_ char* write_buf, _in_ uint64 offset, _in_ int size, bool direct_flag, bool for_extension)
{// 检查偏移量和大小int writeFileId = offset / MAX_FILE_SIZE;uint64 writeOffset = offset % MAX_FILE_SIZE;// 计算当前写入的大小,不超过文件剩余大小int write_size = std::min(size, (int)(MAX_FILE_SIZE - writeOffset));int left_size = size - write_size;// 临时文件名缓冲区char tmpFileName[MAXPGPATH] = {0};errno_t rc = 0;// 覆盖 CU,不增加最大大小while (write_size > 0) {// 获取当前写入的文件名GetFileName(tmpFileName, MAXPGPATH, writeFileId);// 如果文件名发生变化,关闭之前的文件并打开新的文件if (strcmp(tmpFileName, m_fileName) != 0) {if (m_fd != FILE_INVALID) {/** 如果切换数据文件,刷新数据。在文件扩展期间不执行此操作,因为很快就会 fsync 实际数据。*/if (!for_extension)FlushDataFile();FileClose(m_fd);}// 打开新文件,并更新当前文件名m_fd = OpenFile(tmpFileName, writeFileId, direct_flag);Assert(m_fd != FILE_INVALID);rc = strcpy_s(m_fileName, MAXPGPATH, tmpFileName);securec_check(rc, "\0", "\0");}// 如果文件句柄无效,报告文件访问错误if (m_fd == FILE_INVALID) {ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", tmpFileName)));}// 将数据写入文件int nbytes = 0;if ((nbytes = FilePWrite(m_fd, write_buf, write_size, writeOffset)) != write_size) {// 仅报告警告,不中断执行ereport(WARNING,(errcode_for_file_access(),errmsg("Overwrite CU failed. file \"%s\" , offset(%lu), size(%d), expect_write_size(%d), ""acture_write_size(%d): %m",tmpFileName,writeOffset,size,write_size,nbytes),handle_in_client(true)));}// 更新文件 ID、偏移量和数据缓冲区指针++writeFileId;writeOffset = 0;write_buf += write_size;// 计算下一轮写入的大小write_size = (((unsigned int)left_size > MAX_FILE_SIZE) ? MAX_FILE_SIZE : left_size);left_size -= write_size;}// 检查是否所有数据均已写入if (left_size != 0) {ereport(ERROR, (errcode_for_file_access(),errmsg("write file \"%s\" failed in OverwriteCU!", tmpFileName)));}
}
注释:“远程读取 CU 数据” 通常指的是从一个远程的存储位置或节点上获取 CU(Column Unit,列存储中的基本数据单元)的数据。在数据库系统中,特别是在分布式或集群环境中,可能存在将数据存储在不同的节点上的情况。当需要在一个节点上执行操作,但数据实际存储在另一个节点上时,就需要进行远程读取。
CUStorage::OverwriteCU 函数
CUStorage::OverwriteCU 函数主要目的是从 CU 文件中加载 CU 数据,并将其存储在相应的数据结构中,以便后续使用。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp
)
/** @Description: 从 CU 文件中加载 CU 数据* @Param[IN/OUT] cuPtr: 待加载数据的 CU 对象* @Param[IN] direct_flag: 如果启用 ADIO 特性,使用 DIO(Direct I/O)* @Param[IN] inCUCache: 指示 cuPtr 是否在 CU 缓存中* @Param[IN] offset: CU 数据在逻辑文件中的逻辑偏移量* @Param[IN] size: CU 数据的大小* @See also: 有关更多信息,请参阅...*/
void CUStorage::LoadCU(_in_ CU* cuPtr, _in_ uint64 offset, _in_ int size, bool direct_flag, bool inCUCache)
{// 检查参数的有效性if (size < 0 || (uint64)size > MAX_FILE_SIZE) {ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),errmsg("CUStorage::LoadCU 中的无效大小(%u)", size)));}// 如果 size 为 0,则直接报错并返回if (size == 0) {cuPtr->m_compressedBufSize = 0;ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),errmsg("CUStorage::LoadCU 中的大小为 0")));return;}uint64 load_offset;int load_size;// 计算对齐后的偏移量和大小load_offset = GetAlignCUOffset(offset);cuPtr->m_head_padding_size = offset - load_offset;load_size = GetAlignCUSize(cuPtr->m_head_padding_size + size);// 分配加载缓冲区内存,并加载数据// 注意:为了避免在 readData 函数中越界读取,多分配 8 字节内存。cuPtr->m_compressedLoadBuf = (char*)CStoreMemAlloc::Palloc(load_size + 8, !inCUCache);Load(load_offset, load_size, cuPtr->m_compressedLoadBuf, direct_flag);// 设置 CU 数据的指针和大小cuPtr->m_compressedBuf = cuPtr->m_compressedLoadBuf + cuPtr->m_head_padding_size;cuPtr->SetCUSize(size);cuPtr->m_compressedBufSize = size;cuPtr->m_cache_compressed = true;
}
CUStorage::RemoteLoadCU 函数
CUStorage::RemoteLoadCU 函数的主要目的是从远程节点加载 CU 数据,并将其存储在相应的数据结构中,以供后续使用。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp
)
/** @Description: 从远程节点加载 CU 数据* @Param[IN] cuPtr: 待加载数据的 CU 对象* @Param[IN] direct_flag: 如果启用 ADIO 特性,使用 DIO(Direct I/O)* @Param[IN] inCUCache: 指示 cuPtr 是否在 CU 缓存中* @Param[IN] offset: CU 数据在逻辑文件中的逻辑偏移量* @Param[IN] size: CU 数据的大小* @See also: 有关更多信息,请参阅...*/
void CUStorage::RemoteLoadCU(_in_ CU* cuPtr, _in_ uint64 offset, _in_ int size, bool direct_flag, bool inCUCache)
{/* 调用方应该为 m_compressedLoadBuf 分配内存 */Assert(cuPtr->m_compressedLoadBuf != NULL);/* 获取偏移量和大小 */uint64 load_offset = GetAlignCUOffset(offset);cuPtr->m_head_padding_size = offset - load_offset;int load_size = GetAlignCUSize(cuPtr->m_head_padding_size + size);/* 获取当前 XLog 插入位置 */XLogRecPtr cur_lsn = GetInsertRecPtr();/* 获取远程地址 */char remote_address1[MAXPGPATH] = {0}; /* remote_address1[0] = '\0'; */char remote_address2[MAXPGPATH] = {0}; /* remote_address2[0] = '\0'; */GetRemoteReadAddress(remote_address1, remote_address2, MAXPGPATH);const char* remote_address = remote_address1;int retry_times = 0;retry:if (remote_address[0] == '\0' || remote_address[0] == ':')ereport(ERROR, (errcode(ERRCODE_IO_ERROR), (errmodule(MOD_REMOTE), errmsg("远程不可用"))));ereport(LOG,(errmodule(MOD_REMOTE),errmsg("从远程节点读取 CU 文件,%s 偏移 %lu 大小 %d,源地址:%s",GetColumnFileName(),offset,size,remote_address)));PROFILING_REMOTE_START();int ret_code = ::RemoteGetCU(remote_address,m_cnode.m_rnode.spcNode,m_cnode.m_rnode.dbNode,m_cnode.m_rnode.relNode,m_cnode.m_attid,load_offset,load_size,cur_lsn,cuPtr->m_compressedLoadBuf);PROFILING_REMOTE_END_READ(size, (ret_code == REMOTE_READ_OK));if (ret_code != REMOTE_READ_OK) {if (IS_DN_DUMMY_STANDYS_MODE() || retry_times >= 1) {ereport(ERROR,(errcode(ERRCODE_IO_ERROR),(errmodule(MOD_REMOTE),errmsg("从 %s 读取失败,%s", remote_address, RemoteReadErrMsg(ret_code)))));} else {ereport(WARNING,(errmodule(MOD_REMOTE),errmsg("从 %s 读取失败,%s,尝试另一个地址", remote_address, RemoteReadErrMsg(ret_code)),handle_in_client(true)));/* 检查中断 */CHECK_FOR_INTERRUPTS();remote_address = remote_address2;++retry_times;goto retry; /* 跳出 retry_times >= 1 */}}// CU 数据已加载完成,因此设置 CU 大小。// 我们将在解压缩 CU 数据时检查此值。cuPtr->m_compressedBuf = cuPtr->m_compressedLoadBuf + cuPtr->m_head_padding_size;cuPtr->SetCUSize(size);cuPtr->m_compressedBufSize = size;cuPtr->m_cache_compressed = true;
}
CUStorage::Load 函数
CUStorage::RemoteLoadCU 函数的目的是从文件中读取数据,然后存储到指定的缓冲区 outbuf 中。函数会根据给定的偏移量和大小,从相应的文件中读取数据块。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp
)
void CUStorage::Load(_in_ uint64 offset, _in_ int size, __inout char* outbuf, bool direct_flag)
{int readFileId = CU_FILE_ID(offset); // 获取读取文件的文件IDuint64 readOffset = CU_FILE_OFFSET(offset); // 获取在文件中的偏移量int read_size = min(size, (int)(MAX_FILE_SIZE - readOffset)); // 计算读取的大小,不超过文件剩余大小int left_size = size - read_size; // 剩余需要读取的大小char* read_buf = outbuf; // 读取缓冲区指针char tmpFileName[MAXPGPATH]; // 临时文件名errno_t rc = 0;while (read_size > 0) {GetFileName(tmpFileName, MAXPGPATH, readFileId); // 获取当前文件名if (strcmp(tmpFileName, m_fileName) != 0) {if (m_fd != FILE_INVALID)FileClose(m_fd); // 关闭之前打开的文件m_fd = OpenFile(tmpFileName, readFileId, direct_flag); // 打开新文件if (m_fd == FILE_INVALID) {ereport(ERROR, (errcode_for_file_access(), errmsg("无法打开文件 \"%s\"", tmpFileName)));}rc = strcpy_s(m_fileName, MAXPGPATH, tmpFileName); // 更新当前文件名securec_check_c(rc, "\0", "\0");}int nbytes = FilePRead(m_fd, read_buf, read_size, readOffset); // 从文件读取数据if (nbytes != read_size) {LoadCUReportIOError(tmpFileName, readOffset, nbytes, read_size, size); // 报告读取错误}++readFileId;readOffset = 0;read_buf += read_size;read_size = (((unsigned int)left_size > MAX_FILE_SIZE) ? MAX_FILE_SIZE : left_size);left_size -= read_size;}if (left_size != 0) {ereport(ERROR, (errcode_for_file_access(),errmsg("在加载数据时读取文件 \"%s\" 失败!", tmpFileName)));}
}
CUStorage::WSLoad 函数
CUStorage::WSLoad 函数是用于在工作空间(WS)中加载数据的。它会读取指定偏移量和大小的数据块,并将其存储到指定的缓冲区 outbuf 中。函数会检查读取的大小是否有效,以及是否符合对齐要求。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp
)
int CUStorage::WSLoad(_in_ uint64 offset, _in_ int size, __inout char* outbuf, bool direct_flag)
{int readFileId = CU_FILE_ID(offset); // 获取读取文件的文件IDuint64 readOffset = CU_FILE_OFFSET(offset); // 获取在文件中的偏移量int read_size = min(size, (int)(MAX_FILE_SIZE - readOffset)); // 计算读取的大小,不超过文件剩余大小int left_size = size - read_size; // 剩余需要读取的大小errno_t rc = 0;char* read_buf = outbuf; // 读取缓冲区指针char tmpFileName[MAXPGPATH]; // 临时文件名bool isCUReadSizeValid = false; // 检查读取的大小是否有效const int CUALIGNSIZE = is_2byte_align ? ALIGNOF_TIMESERIES_CUSIZE : ALIGNOF_CUSIZE;isCUReadSizeValid = (read_size > 0 && 0 == read_size % CUALIGNSIZE);if (!isCUReadSizeValid) {ereport(ERROR,(errcode_for_file_access(),errmsg("意外的CU文件读取信息: 偏移(%lu), 大小(%d), 文件ID(%d), 文件偏移(%lu), 期望读取大小(%d).",offset,size,readFileId,readOffset,read_size)));return -1;}while (read_size > 0) {GetFileName(tmpFileName, MAXPGPATH, readFileId); // 获取当前文件名if (strcmp(tmpFileName, m_fileName) != 0) {if (m_fd != FILE_INVALID)FileClose(m_fd); // 关闭之前打开的文件m_fd = WSOpenFile(tmpFileName, readFileId, direct_flag); // 打开新文件if (FILE_INVALID == m_fd)return 0;rc = strcpy_s(m_fileName, MAXPGPATH, tmpFileName); // 更新当前文件名securec_check(rc, "\0", "\0");}int nbytes = 0;/* IO collector and IO scheduler for cstore insert */if (ENABLE_WORKLOAD_CONTROL)IOSchedulerAndUpdate(IO_TYPE_READ, 1, IO_TYPE_COLUMN);if ((nbytes = FilePRead(m_fd, read_buf, read_size, readOffset)) != read_size) {if (0 == nbytes) {if (u_sess->attr.attr_storage.HaModuleDebug)ereport(NOTICE,(errcode_for_file_access(),errmsg("HA-WSLoad: 读取文件 \"%s\" 获取了0字节,请检查相应的CU文件。",tmpFileName)));return 0;}if (nbytes % CUALIGNSIZE != 0) {ereport(ERROR,(errcode_for_file_access(),errmsg("读取文件 \"%s\" 失败, 偏移(%lu), 大小(%d), 期望读取大小(%d), ""实际读取大小(%d), 可能需要首先升级cstore数据文件",tmpFileName,offset,size,read_size,nbytes)));} else {ereport(ERROR,(errcode_for_file_access(),errmsg("无法读取文件 \"%s\", 偏移(%lu), 大小(%d), 期望读取大小(%d), ""实际读取大小(%d): %m",tmpFileName,offset,size,read_size,nbytes)));}}++readFileId;readOffset = 0;read_buf += read_size;read_size = (((unsigned int)left_size > MAX_FILE_SIZE) ? MAX_FILE_SIZE : left_size);left_size -= read_size;}if (left_size != 0) {ereport(ERROR, (errcode_for_file_access(),errmsg("在WSLoad中读取文件 \"%s\" 失败!", tmpFileName)));}return size;
}
CUStorage::TruncateDataFile 函数
CUStorage::TruncateDataFile 函数的目的是在同一个事务块(XACT block)中,当关系创建和截断操作在同一个事务中发生时,截断列数据文件。函数会循环处理所有的数据文件,打开每个文件,截断其内容,然后关闭文件。如果截断操作失败,会发出警告。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp
)
/** @Description: 在同一个XACT块中创建和截断关系时,截断列数据文件*/
void CUStorage::TruncateDataFile()
{int fileId = 0; // 文件ID初始化为0char tmpFileName[MAXPGPATH]; // 临时文件名的缓冲区while (1) { // 无限循环,直到没有更多的数据文件if (!IsDataFileExist(fileId)) // 如果数据文件不存在,跳出循环break;GetFileName(tmpFileName, MAXPGPATH, fileId); // 获取数据文件名File vfd = OpenFile(tmpFileName, fileId, false); // 打开数据文件if (FileTruncate(vfd, 0)) { // 截断文件内容ereport(WARNING, (errmsg("could not ftruncate file \"%s\": %m", tmpFileName))); // 如果截断失败,发出警告}CloseFile(vfd); // 关闭文件++fileId; // 增加文件ID,处理下一个文件}
}
CUStorage::IsDataFileExist 函数
CUStorage::IsDataFileExist 函数的目的是检查给定文件 ID 对应的数据文件是否存在。函数会构造数据文件名,然后使用 lstat 函数获取文件的状态信息。如果获取失败,说明文件不存在,返回 false;如果获取成功,说明文件存在,返回 true。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp
)
bool CUStorage::IsDataFileExist(int fileId) const
{char tmpFileName[MAXPGPATH]; // 临时文件名的缓冲区GetFileName(tmpFileName, MAXPGPATH, fileId); // 获取数据文件名struct stat st; // 用于保存文件状态信息的结构if (lstat((const char*)tmpFileName, &st) == -1) // 获取文件状态信息return false; // 如果获取失败,文件不存在return true; // 如果获取成功,文件存在
}
CUStorage::GetBcmFileName 函数
CUStorage::GetBcmFileName 函数的目的是构造与给定文件 ID 相关的 BCM 文件名。函数首先使用 snprintf_s 构造文件名,格式为 “%s_%s.%d” 或 “%s_%s”,其中 %s 会被替换为文件名前缀和 BCM fork 的名称,%d 会被替换为文件 ID。如果文件 ID 大于0,表示有序列号,会构造带有序列号的文件名;如果文件 ID 为 0,表示无序列号,会构造不带序列号的文件名。函数最后确保字符串以 null 结尾。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp
)
void CUStorage::GetBcmFileName(_out_ char* bcmfile, _in_ int fileId) const
{Assert(fileId >= 0); // 断言文件ID必须大于等于0// bcm file list: 16385_C1_bcm 16385_C1_bcm.1 16385_C1_bcm.2 ....int rc = 0; // 用于保存 `snprintf_s` 函数的返回值if (fileId > 0) { // 如果文件ID大于0,表示有序列号rc = snprintf_s(bcmfile, MAXPGPATH, MAXPGPATH - 1, "%s_%s.%d", m_fileNamePrefix, forkNames[BCM_FORKNUM], fileId);} else { // 如果文件ID为0,表示无序列号rc = snprintf_s(bcmfile, MAXPGPATH, MAXPGPATH - 1, "%s_%s", m_fileNamePrefix, forkNames[BCM_FORKNUM]);}securec_check_ss(rc, "", ""); // 检查 `snprintf_s` 的返回值bcmfile[MAXPGPATH - 1] = '\0'; // 确保字符串以 null 结尾
}
CUStorage::TruncateBcmFile 函数
CUStorage::TruncateBcmFile 函数的目的是==截断与给定文件 ID 相关的 BCM 文件==。函数首先初始化文件 ID 为 0,然后进入一个无限循环。在每次循环中,它检查与当前文件 ID 相关的 BCM 文件是否存在,如果不存在则退出循环。如果文件存在,函数获取 BCM 文件名,然后打开文件,尝试截断文件大小为 0。如果截断文件失败,函数会发出警告。最后,函数关闭文件,然后增加文件 ID,以便处理下一个 BCM 文件。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp
)
/** @Description: truncate column bcm files which relation CREATE and TRUNCATE in same XACT block*/
void CUStorage::TruncateBcmFile()
{int fileId = 0; // 初始化文件ID为0char tmpFileName[MAXPGPATH]; // 用于保存文件名的缓冲区while (1) { // 无限循环,直到找不到更多的BCM文件为止if (!IsBcmFileExist(fileId)) // 如果BCM文件不存在,退出循环break;GetBcmFileName(tmpFileName, fileId); // 获取BCM文件名File vfd = OpenFile(tmpFileName, fileId, false); // 打开BCM文件if (FileTruncate(vfd, 0)) { // 如果截断文件失败,发出警告ereport(WARNING, (errmsg("could not ftruncate file \"%s\": %m", tmpFileName)));}CloseFile(vfd); // 关闭文件++fileId; // 增加文件ID,处理下一个BCM文件}
}
CUStorage::IsBcmFileExist 函数
CUStorage::IsBcmFileExist 函数的目的是检查与给定文件 ID 相关的 BCM 文件是否存在。函数首先获取 BCM 文件名,然后使用 lstat 函数检查文件是否存在。如果 lstat 返回 -1,说明文件不存在,函数返回 false;否则,说明文件存在,函数返回 true。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp
)
bool CUStorage::IsBcmFileExist(_in_ int fileId) const
{char tmpFileName[MAXPGPATH]; // 用于保存文件名的缓冲区GetBcmFileName(tmpFileName, fileId); // 获取BCM文件名struct stat st; // 用于存储文件状态信息的结构体if (lstat((const char*)tmpFileName, &st) == -1) // 使用lstat检查文件是否存在return false; // 如果文件不存在,返回falsereturn true; // 如果文件存在,返回true
}
总结
CUStorage 类相关成员函数操作较多,这里不一一列举了,感兴趣的读者可自行阅读源码。
99%的人还看了
相似问题
猜你感兴趣
版权申明
本文"【 OpenGauss源码学习 —— 列存储(CUStorage)】":http://eshow365.cn/6-40810-0.html 内容来自互联网,请自行判断内容的正确性。如有侵权请联系我们,立即删除!
- 上一篇: PC端使子组件的弹框关闭
- 下一篇: 学习c#的第十六天