存储是网络游戏开发中不可缺少的部分。在存储方面,不同的游戏公司采用不同的数据库,有使用 MySQL 的,也有使用 MongoDB 的。游戏数据库版本迭代加快,直接在数据库中存储 Blob 数据, 是与 Web 较为不同有特色的点。
调用 MySQL 时,使用的第三方库为 MySQL 的 Connector/C。一般不使用 Connector/C++。Connector/C++与 Connector/C 的区别在于 C++版本采用了类的方式操作数据库,而 C 语言版本只提供 API,根据框架的自身特点,C 语言版 本可能更适合一些,而且它更方便、高效。
int mysql_library_init(int argc, char **argv, char **groups)
void mysql_library_end(void)
函数 mysql_library_init 和 mysql_library_end 是一对用于初始化和释放的函数。 mysql_library_init 需要在调用其他函数之前调用,用来初始化 MySQL 库。而 mysql_library_end 调用可以释放内 存,在调用 mysql_close 之后调用,以帮助释放内存数据。
*mysql_init(MYSQL *mysql) MYSQL
该函数有两种用法:一种是传入 MySQL 对象;另一种是传入 nullptr 参数。两种调用都会返回 MySQL 对象指针,如果 有传入值,传入返回的对象就是同一个;如果传入的参数为 nullptr,就会返回一个实例。这两者的差别在于谁来管理 MySQL 对象的实例,如果是在外部创建的对象,在调用 mysql_close 之后就需要手动释放该对象,以免造成内存泄漏。 如果是由 mysql_init 创建的对象,就只需要关闭连接。
void mysql_close(MYSQL *mysql)
该函数用于关闭一个连接,如果 MySQL 实例是库生成的,该函数就会同时释放该对象。
*mysql_real_connect(MYSQL *mysql, const char *host, const char *user, const char *passwd, const char *db, unsigned int port, const char *unix_socket, unsigned long client_flag); MYSQL
第 5 个参数为想要连接的数据库的名字,可以为 nullptr,表示只是想产生一个连接,但不选择具 体数据库,后续调用其他函数选定具体的数据库。
连接一个给定名字的数据库时,如果这个数据库不存在,就会出现错误,而 MySQL 对象也不可再用,需要关 闭。鉴于这种情况,在第一次连接时就可以将数据库名设为 nullptr,让它进行一个默认的连接,再调用 mysql_select_db 函数进行数据库的选择,若选择失败,则说明所需的数据库不存在,这时可以创建需要的数据库。
int mysql_options(MYSQL *mysql, enum mysql_option option, const void *arg)
该函数是在 mysql_init 之后、mysql_real_connect 之前调用的,对 MySQL
对象进行一些属性设置
常用属性:
int mysql_select_db(MYSQL *mysql, const char *db)
该函数输入一个需要连接的数据库名。返回值 0 表示成功,非 0 即为出错编号。该函数指定一个数据库作为当前选 中的数据库。在调用该函数时,如果用户没有指定数据库的权限就会出错。
unsigned int mysql_errno(MYSQL *mysql)
在每一个函数调用之后,如果出现错误,就可以通过调用函数 mysql_errno 得到当前错误的编号。
int mysql_ping(MYSQL *mysql)
该函数检查连接是否处于正常工作中。在函数 mysql_options 的设置中,可以打开自动连接的开关,当网络断开时,调用 mysql_ping 会自动重新连接。
MysqlConnector 类基于 MysqlBase 类,该类中实现了 IMessageSystem 接口, 处理诸如查询角色或创建角色这样的协议,MysqlBase 类则提供了连接 MySQL 以及读取和查询的基本功能。
class MysqlConnector : public MysqlBase, public Entity<MysqlConnector>, public IMessageSystem, public IAwakeFromPoolSystem<>{}
class MysqlBase
{
public:
bool ConnectInit();
virtual void Disconnect();
//...
protected:
* _pDbCfg{ nullptr };
DBConfig* _pMysql{ nullptr };
MYSQL* _pMysqlRes{ nullptr };
MYSQL_RESint _numFields{ 0 };
* _pMysqlFields{ nullptr };
MYSQL_FIELD};
连接数据库,在MysqlConnector::AwakeFromPool
进行
void MysqlConnector::AwakeFromPool(){
auto pYaml = Yaml::GetInstance();
auto pConfig = pYaml->GetConfig(APP_DB_MGR);
auto pDbConfig = dynamic_case<DBMgrConfig*>(pConfig);
= pDbConfig->GetDBConfig(DBMgrConfig::DBTypeMysql);
_pDbCfg if(_pDbCfg == nullptr){
("Failed to get mysql config");
LOG_ERRORreturn;
}
();
Connect}
初始化函数最后调用了函数 Connect 进行连接,调用了基 类的连接函数 MysqlBase::ConnectInit。该函数进行了一些连接前的基本操作,初始 化操作成功之后,调用 mysql_real_connect 函数发起一个连接。如果经检查没有错误, 就认为连接成功。
bool MysqlBase::ConnectInit()
{
();
DisConnect= mysql_init(nullptr);
_pMysql if (_pMysql == nullptr)
{
();
CheckMysqlError("mysql_init == nullptr");
LOG_ERRORreturn false;
}
// 设置连接等待时间,10s
int outtime = 10;
(_pMysql, MYSQL_OPT_CONNECT_TIMEOUT, &outtime);
mysql_optionsbool reConnect = false; // 不自动重连
(_pMysql, MYSQL_OPT_RECONNECT, &reConnect);
mysql_optionsreturn true;
}
关闭连接,在 MysqlConnector 与 MysqlBase 中都有 Disconnect 方法
void MysqlConnector::Disconnect()
{
();
CleanStmts::Disconnect();
MysqlBase}
void MysqlBase::Disconnect()
{
//...
if (_pMysql != nullptr)
{
(_pMysql);
mysql_close= nullptr;
_pMysql }
}
在关闭连接时调用了库函数 mysql_close 以释放内存。虽然在游戏框架上 使用了多线程,但是对于一个 MysqlConnector 对象而言,它遵照 Actor 原则, 代码没有耦合性,因而线程是安全的。如果需要,那么可以启动多个线程来执 行数据库操作,每一个线程都生成一个独立的 MysqlConnector 实例,每一个线 程都相当于一个 MySQL 客户端,互不影响。
对于需要多次执行的语句,预处理是一种非常高效的方式,其原理是一次生成语句,每次 执行时传入参数,以减少数据的传递。一次生成语句的好处是不用每次对 SQL 语句进行解析,极 大地提高了效率。预处理就像一个函数,每次执行时只需要填入不同的参数,就能得到不同的 结果。
*mysql_stmt_init(MYSQL *mysql) MYSQL_STMT
MySQL 库的预处理使用一个名为 MYSQL_STMT 的结构,调用该函数即可创建一个 MYSQL_STMT 指 针,返回值不是 nullptr 时则为成功。
(MYSQL_STMT *stmt) my_bool mysql_stmt_close
该函数销毁传入的 MYSQL_STMT 指针。
int mysql_stmt_prepare(MYSQL_STMT *stmt, const char *stmt_str, unsigned long length)
该函数将一个 SQL 语句写入预处理结构中,第二个参数即为 SQL 语句,该语句不需要有结束 符分号,即不需要符号“;”。传入的 SQL 语句在参数的位置用“?”代替。 返回值为非 0 时,表示有错误,可以用 mysql_stmt_error 函数查看错误编码。
const char *mysql_stmt_error(MYSQL_STMT *stmt)
一旦发现某个预处理函数有异常或出错,就可以通过调用该函数来获取错误描述。
(MYSQL_STMT *stmt, MYSQL_BIND *bind) my_bool mysql_stmt_bind_param
前面传入的 SQL 语句中,关于动态参数的部分是用“?”来代替的。而函数
mysql_stmt_bind_param
是专门为这些“?”准备的,利用
MYSQL_BIND 结构提供参数。像函数一
样,一个预处理在实际执行阶段需要绑定实际的参数。
int mysql_stmt_execute(MYSQL_STMT *stmt)
绑定完参数之后的预处理指针就可以调用执行函数来执行。如果返回结果不为 0,就表示有 错误。
(MYSQL_STMT *stmt) my_ulonglong mysql_stmt_affected_rows
预处理执行之后,我们可以通过该函数来获取结果行的个数,即当前执行的预处理结果中 有多少行数据。
以创建角色为例,先创建“创建角色的预处理”,并将这个预处理实例存 入一个字典中,便于每次使用时快速调用。当创建角色的协议发送到 dbmgr 进 程时,调用预处理实例并传入参数,这些参数是玩家名字和玩家的初始数据。 最后,调用执行函数写入数据库中。
在 MysqlConnector 类创建之初就初始化两个预处理:一个是插入玩家数据的预处理;另一个用于更新玩家数据。
void MysqlConnector::InitStmts()
{
* stmt = CreateStmt("insert into player ( sn, account, name, savetime, createtime ) value ( ?, ?, ?, now(), now() )");
DatabaseStmt.insert(std::make_pair(DatabaseStmtKey::StmtCreate, stmt));
_mapStmt= CreateStmt("update player set base=?, misc=?,savetime=now() where sn = ?");
stmt .insert(std::make_pair(DatabaseStmtKey::StmtSave, stmt));
_mapStmt("\tMysqlConnector::InitStmts successfully!");
LOG_DEBUG}
DatabaseStmt 数据结构原型为
struct DatabaseStmt
{
//MYSQL_STMT指针
*stmt{nullptr};
MYSQL_STMT //MYSQL_BIND指针数组
*bind{nullptr};
MYSQL_BIND //一个内存地址块,这个内存地址块为了存储绑定数据,可以给它一个40960的长度。
//绑定的数据,即使是二进制数据,总大小也不会超过40960
char *bind_buffer{nullptr};
int bind_index;//内存块当前读写地址位置偏移值
//bind_index为当前bind的位置偏移值,bind数组每加一个数据,bind_index自增一次
int bind_buffer_index;
};
生成 DatabaseStmt 在MysqlConnector::CreateStmt
方法内
*MysqlConnector::CreateStmt(const char *sql) const
DatabaseStmt {
int str_len = strlen(sql);
*stmt = new DatabaseStmt();
DatabaseStmt int param_count = 0;
// 从MySQL库中创建MYSQL_STMT指针
->stmt = mysql_stmt_init(_pMysql);
stmtif (mysql_stmt_prepare(stmt->stmt, sql, str_len) != 0)
{
return nullptr; // 失败
}
for (int i = 0; i < str_len; i++)
{
if ((sql[i] == '?') || (sql[i] == '@'))
++; // 统计参数个数
param_count}
if (param_count > 0)
{
// 为每个参数创建MYSQL_BIND空间
->bind = new MYSQL_BIND[param_count];
stmt(stmt->bind, 0, sizeof(MYSQL_BIND) * param_count);
memset// 数据备用空间
->bind_buffer = new char[MAX_BIND_BUFFER];
stmt}
else
{
->bind = nullptr;
stmt->bind_buffer = nullptr;
stmt}
return stmt;
}
创建角色时,创建角色的协议被 MysqlConnector 类捕捉到,它的处理函数为 HandleCreatePlayer。
void MysqlConnector::HandleCreatePlayer(Packet *pPacket)
{
auto protoCreate = pPacket->ParseToProto<Proto::CreatePlayerToDB>();
auto protoPlayer = protoCreate.player();
*stmt = GetStmt(DatabaseStmtKey::StmtCreate);
DatabaseStmt if (stmt == nullptr)
return;
// ...
// create
(stmt);
ClearStmtParam(stmt, protoPlayer.sn());
AddParamUint64(stmt, protoCreate.account().c_str());
AddParamStr(stmt, protoPlayer.name().c_str());
AddParamStr//...
if (ExecuteStmt(stmt))
{
.set_return_code(Proto::CreatePlayerReturnCode::CPR_Create_OK);
protoRs}
//...
}
收到创建协议之后,从预处理字典中取出了已经准备好的 DatabaseStmt。随后调用了 ClearStmtParam、AddParamUint64 和 AddParamStr 三个函数,分别是为了清理旧数据、压入一个 uint64 参数和压入一个字符串到预处理结构中。
//让缓存回到初始状态
void MysqlConnector::ClearStmtParam(DatabaseStmt *stmt)
{
->bind_index = 0;
stmt->bind_buffer_index = 0;
stmt}
创建角色的 SQL
语句insert into player(sn,account,name,savetime,createtime) value(?,?,?,now(),now())
有
3 个参数,分别是 uint64 和两个 string。
void MysqlConnector::AddParamUint64(DatabaseStmt *stmt, uint64 val)
{
*pBind = &stmt->bind[stmt->bind_index];
MYSQL_BIND ->buffer_type = MYSQL_TYPE_LONGLONG;
pBind->buffer = &stmt->bind_buffer[stmt->bind_buffer_index];
pBind->is_unsigned = true;
pBind*static_cast<uint64 *>(pBind->buffer) = val;
->bind_index++;
stmt->bind_buffer_index += sizeof(uint64);
stmt}
void MysqlConnector::AddParamStr(DatabaseStmt *stmt, const char *val)
{
*pBind = &stmt->bind[stmt->bind_index];
MYSQL_BIND int len = strlen(val);
->buffer_type = MYSQL_TYPE_STRING;
pBind->buffer = &stmt->bind_buffer[stmt->bind_buffer_index];
pBind->length = (unsigned long *)&stmt->bind_buffer[stmt->bind_buffer_index + len + 1];
pBind((char *)pBind->buffer, len + 1, val, len + 1);
engine_strncpy*(pBind->length) = len;
->buffer_length = len;
pBind->bind_index++;
stmt->bind_buffer_index += (len + 1 + sizeof(unsigned long *));
stmt}
上面的代码并不难理解,如果使用过 mysql c 的话
int mysql_query(MYSQL *mysql, const char *stmt_str);
int mysql_real_query(MYSQL *mysql, const char *stmt_str, unsigned long length);
库中提供了两个查询函数。函数 mysql_query 执行指定的 SQL 语句,参数
stmt_str 可 以不带 SQL
语句的结束符“;”,但必须是有结束符的字符串,即最后以’\0’字符结尾。
mysql_query 函数使用的 SQL
语句不能带二进制数据,如果需要带二进制数据,就需要使用函数
mysql_real_query。从函数定义上能看得出来,mysql_real_query 函数执行 SQL
语句的时候,使用的参数是 char*
和它的长度。这个
char*
的字符串是允许存在’\0’这种结束符的。这就是这两个函数本质上的区别。
可以在 mysql_query 调用之后调用函数 mysql_field_count 查看有多少列数据。如果执行的语句不是一个 select,那么 mysql_field_count 调用的结果可能为 0。
*mysql_store_result(MYSQL *mysql) MYSQL_RES
调用 mysql_query 函数之后可以用 mysql_store_result 得到结果,该函数将全部结果缓存到 MYSQL_RES 结构中并返回,MYSQL_RES 用完之后需要使用 mysql_free_result 释放数 据。函数 mysql_store_result 返回为空时,不意味着失败。如果执行语句是 insert 语 句,mysql_store_result 就会返回空,因为 insert 语句并没有集合可以返回。
unsigned int mysql_num_fields(MYSQL_RES *result)
调用函数 mysql_store_result 的结果不为空时,可以调用 mysql_num_fields 来判断 有多少列。
*mysql_fetch_field(MYSQL_RES *result) MYSQL_FIELD
该函数的使用相当于一个迭代器,对 MYSQL_RES 的列数据进行一个迭代,当返回值为 空时表示没有更多的列了。
(MYSQL_RES *result) MYSQL_ROW mysql_fetch_row
函数 mysql_fetch_row 也是一个迭代器,迭代的是 MYSQL_RES 集合,也就是 mysql_store_result 得到的集合
首先组织一条 SQL 语句,调用 mysql_query 得到一个结果集,再通过调用 mysql_fetch_row 函数得到 MYSQL_ROW 行数据。而每一行的具体数据则是由 MYSQL_ROW 类的操作函数读取数据的。
在 MysqlBase 类中,函数 Query 封装 mysql_query 函数。
bool MysqlBase::Query(const char *sql, my_ulonglong &affected_rows)
{
if (nullptr != _pMysqlRes)
{
(_pMysqlRes);
mysql_free_result= nullptr;
_pMysqlRes }
if (mysql_query(_pMysql, sql) != 0)
{
("Query error:" << mysql_error(_pMysql) << " sql:" << sql);
LOG_ERRORreturn false;
}
// maybe query is not a select
= mysql_store_result(_pMysql);
_pMysqlRes if (_pMysqlRes != nullptr)
{
= mysql_num_fields(_pMysqlRes);
_numFields = mysql_fetch_fields(_pMysqlRes);
_pMysqlFields }
= mysql_affected_rows(_pMysql);
affected_rows return true;
}
当收到查询玩家协议 L2DB_QueryPlayerList 时,MysqlConnector 类的处理函数 HandleQueryPlayerList 的实现如下。
void MysqlConnector::HandleQueryPlayerList(Packet *pPacket)
{
auto protoQuery = pPacket->ParseToProto<Proto::QueryPlayerList>();
(protoQuery.account(), pPacket->GetSocket());
QueryPlayerList}
void MysqlConnector::QueryPlayerList(std::string account, SOCKET socket)
{
;
my_ulonglong affected_rowsstd::string sql = strutil::format("select sn, name, base, item, misc from player where account = '%s'", account.c_str());
if (!Query(sql.c_str(), affected_rows))
{
("!!! Failed. MysqlConnector::HandleQueryPlayerList. sql:" << sql.c_str());
LOG_ERRORreturn;
}
::PlayerList protoRs;
Proto.set_account(account.c_str());
protoRs
::PlayerBase protoBase;
Protoif (affected_rows > 0)
{
std::string tempStr;
;
MYSQL_ROW rowwhile ((row = Fetch()))
{
auto pProtoPlayer = protoRs.add_player();
->set_sn(GetUint64(row, 0));
pProtoPlayer->set_name(GetString(row, 1));
pProtoPlayer
(row, 2, tempStr);
GetBlob.ParseFromString(tempStr);
protoBase->set_level(protoBase.level());
pProtoPlayer->set_gender(protoBase.gender());
pProtoPlayer}
}
("player list. account:" << account.c_str() << " player list size:" << protoRs.player_size() << " socket:" << socket);
LOG_DEBUG
// 没有找到也需要返回pResultPacket
(Proto::MsgId::L2DB_QueryPlayerListRs, socket, protoRs);
SendPacket}
在 MysqlBase 中定义了获取行以及字段的方法
::Fetch() const
MYSQL_ROW MysqlBase{
if (_pMysqlRes == nullptr)
return nullptr;
return mysql_fetch_row(_pMysqlRes);
}
::GetUint64(MYSQL_ROW row, int index)
uint64 MysqlBase{
if (row[index] == nullptr)
{
("!!! Failed. MysqlConnector::GetUint64");
LOG_ERRORreturn 0;
}
return atoll(row[index]);
}
// MYSQL_ROW的定义其实是 typedef char **MYSQL_ROW; /* return data as array of strings */
特别注意的是取二进制数据,需要先获取返回列数据的长度。
void MysqlBase::GetBlob(MYSQL_ROW row, int index, std::string &protoStr) const
{
unsigned long *pLengths = mysql_fetch_lengths(_pMysqlRes);
long blobLength = pLengths[index];
if (blobLength <= 0)
{
= "";
protoStr return;
}
char *blobByte = new char[blobLength + 1];
(row, index, blobByte, blobLength);
GetBlob[blobLength] = '\0';
blobByte= blobByte;
protoStr delete[] blobByte;
}
int MysqlBase::GetBlob(MYSQL_ROW row, int index, char *buf, unsigned long size) const
{
unsigned int l = size > 0 ? size : 0;
if (row[index] == nullptr)
{
("!!! Failed. MysqlConnector::GetBlob");
LOG_ERRORreturn 0;
}
unsigned long *lengths = mysql_fetch_lengths(_pMysqlRes);
if (lengths[index] < l)
= lengths[index];
l (buf, row[index], l);
memcpyreturn l;
}
一般来说,服务端每一个版本的代码都对应一个相应的 SQL 文件。导入 SQL 文件到数据库之后,服务器才可以正常地使用数据库。但这项功能要求在编译源代码的同时维护一系列 SQL 文件。长期来讲,这是一件非常令人恼火的事情,代码版本与 SQL 版本不一致 时就会出错。可以写一套用代码更新数据库的设计。
class MysqlTableUpdate : public MysqlBase, public Singleton<MysqlTableUpdate>
{
public:
void Check();
private:
bool CreateDatabaseIfNotExist();
bool UpdateToVersion(); // 检查DB数据,更新到最新版本
bool Update00(); // 00版本的数据修改
private:
typedef std::function<bool(void)> OnUpdate;
std::vector<OnUpdate> _update_func;
int const _version = 0;
};
为每个本版的修改加一个方法,在服务器 main 函数启动时,现进行数据库检查。
int main(int argc, char *argv[])
{
(0, nullptr, nullptr);
mysql_library_init//...
auto pTableUpdateObj = MysqlTableUpdate::Instance();
->Check();
pTableUpdateObj->DestroyInstance();
pTableUpdateObj//...
return 0;
}
在启动线程之前,调用 MysqlTableUpdate::Check 检查当前数据库版本是否为最新版本。
void MysqlTableUpdate::Check()
{
if (!ConnectInit())
return;
auto pYaml = Yaml::GetInstance();
auto pDbMgrCfig = dynamic_cast<DBMgrConfig*>(pYaml->GetConfig(APP_DB_MGR));
= pDbMgrCfig->GetDBConfig(DBMgrConfig::DBTypeMysql);
_pDbCfg if (_pDbCfg == nullptr)
{
("Init failed. get mysql config is failed.");
LOG_ERRORreturn;
}
("Mysql update connect. " << _pDbCfg->Ip.c_str() << ":" << _pDbCfg->Port << " starting... id:" << std::this_thread::get_id());
LOG_DEBUG
if (mysql_real_connect(_pMysql, _pDbCfg->Ip.c_str(), _pDbCfg->User.c_str(), _pDbCfg->Password.c_str(), nullptr, _pDbCfg->Port, nullptr, CLIENT_FOUND_ROWS))
{
(_pMysql, _pDbCfg->DatabaseName.c_str());
mysql_select_db}
int mysqlerrno = CheckMysqlError();
if (mysqlerrno == ER_BAD_DB_ERROR)
{
("Mysql. try create database:" << _pDbCfg->DatabaseName.c_str());
LOG_DEBUG
// 1049: Unknown database。 没有找到数据库,就新建一个
if (!CreateDatabaseIfNotExist())
{
();
Disconnectreturn;
}
(_pMysql, _pDbCfg->DatabaseName.c_str());
mysql_select_db= CheckMysqlError();
mysqlerrno }
if (mysqlerrno > 0)
{
();
Disconnectreturn;
}
// 检查版本,自动更新
if (!UpdateToVersion())
{
("!!!Failed. Mysql update. UpdateToVersion");
LOG_ERRORreturn;
}
(_pMysql);
mysql_ping= CheckMysqlError();
mysqlerrno if (mysqlerrno > 0)
{
();
Disconnectreturn;
}
("Mysql Update successfully! addr:" << _pDbCfg->Ip.c_str() << ":" << _pDbCfg->Port);
LOG_DEBUG}
在检查数据库时,首先创建一个 MySQL 连接,此时并没有选择数据库,函数 mysql_real_connect 中关于数据库名的参数值为 nullptr。连接成功之后,调用 mysql_select_db 函数试探数据库是否存在,如果返回错误码为 ER_BAD_DB_ERROR(1049),就认为可以创建一个数据库。
bool MysqlTableUpdate::CreateDatabaseIfNotExist()
{
// 是否存在数据库,如果不存在,则创建
std::string querycmd = strutil::format("CREATE DATABASE IF NOT EXISTS %s;", \_pDbCfg->DatabaseName.c_str());
;
my_ulonglong affected_rowsif (!Query(querycmd.c_str(), affected_rows))
{
("!!! Failed. MysqlConnector::CreateDatabaseIfNotExist. cmd:" << querycmd.c_str());
LOG_ERRORreturn false;
}
// 链接上的DB之后,选择指定的数据库
if (mysql_select_db(_pMysql, _pDbCfg->DatabaseName.c_str()) != 0)
{
("!!! Failed. MysqlConnector::CreateDatabaseIfNotExist: mysql_select_db:" << LOG4CPLUS_STRING_TO_TSTRING(_pDbCfg->DatabaseName));
LOG_ERRORreturn false;
}
// 设置数据为的字符集,从yaml中配置中读取我们需要的字符集
if (mysql_set_character_set(_pMysql, _pDbCfg->CharacterSet.c_str()) != 0)
{
("!!! Failed. MysqlConnector::CreateDatabaseIfNotExist: Could not set client connection character set to " << LOG4CPLUS_STRING_TO_TSTRING(_pDbCfg->CharacterSet));
LOG_ERRORreturn false;
}
// 设置了数据库大小写敏感,配置文件中为 utf8_general_ci
= strutil::format("ALTER DATABASE CHARACTER SET %s COLLATE %s", _pDbCfg->CharacterSet.c_str(), _pDbCfg->Collation.c_str());
querycmd if (!Query(querycmd.c_str(), affected_rows))
{
("!!! Failed. MysqlConnector::CreateDatabaseIfNotExist. cmd:" << LOG4CPLUS_STRING_TO_TSTRING(querycmd.c_str()));
LOG_ERRORreturn false;
}
// 创建一个version 表,使用了 InnoDB 方式
std::string create_version =
"CREATE TABLE IF NOT EXISTS `version` ("
"`version` int(11) NOT NULL,"
"PRIMARY KEY (`version`)"
") ENGINE=%s DEFAULT CHARSET=%s;";
std::string cmd = strutil::format(create_version.c_str(), "InnoDB", _pDbCfg->CharacterSet.c_str());
if (!Query(cmd.c_str(), affected_rows))
{
("!!! Failed. MysqlConnector::CreateTable. " << LOG4CPLUS_STRING_TO_TSTRING(cmd));
LOG_ERRORreturn false;
}
// 创建一个 player 表
std::string create_player =
"CREATE TABLE IF NOT EXISTS `player` ("
"`sn` bigint(20) NOT NULL,"
"`name` char(32) NOT NULL,"
"`account` char(64) NOT NULL,"
"`base` blob,"
"`item` blob,"
"`misc` blob,"
"`savetime` datetime default NULL,"
"`createtime` datetime default NULL,"
"PRIMARY KEY (`sn`),"
"UNIQUE KEY `NAME` (`name`),"
"KEY `ACCOUNT` (`account`)"
") ENGINE=%s DEFAULT CHARSET=%s;";
= strutil::format(create_player.c_str(), "InnoDB", _pDbCfg->CharacterSet.c_str());
cmd if (!Query(cmd.c_str(), affected_rows))
{
("!!! Failed. MysqlConnector::CreateTable" << LOG4CPLUS_STRING_TO_TSTRING(cmd));
LOG_ERRORreturn false;
}
// 最后创建完了,修改 version表的中version 字段,设为初始的0号版本
= "insert into `version` VALUES ('0')";
cmd if (!Query(cmd.c_str(), affected_rows))
{
("!!! Failed. MysqlConnector::CreateTable." << LOG4CPLUS_STRING_TO_TSTRING(cmd));
LOG_ERRORreturn false;
}
return true;
}
在整个开发或上线的过程中,数据表不可能是一成不变的。有需要时,需要对表结构进行更新。 创建表完成之后调用了 UpdateToVersionDB 进行升级,可以将函数指针放入一个数组,用版本号作为下标,数据库版本升级时依次调用。
bool MysqlTableUpdate::UpdateToVersion()
{
;
my_ulonglong affected_rowsstd::string sql = "select version from `version`";
if (!Query(sql.c_str(), affected_rows))
return false;
= Fetch();
MYSQL_ROW row if (row == nullptr)
return false;
int version = GetInt(row, 0);
if (version == _version)
return true;
// 如果DB版本不匹配,升级DB
for (int i = version + 1; i <= _version; i++) {
if (_update_func[i] == nullptr)
continue;
if (!_update_func[i]()) {
("UpdateToVersion failed!!!!!, version=" << i);
LOG_ERRORreturn false;
}
("update db to version:" << i);
LOG_INFO
// 成功之后,更改DB的version
std::string cmd = strutil::format("update `version` set version = %d", i);
if (!Query(cmd.c_str(), affected_rows)) {
("UpdateToVersion failed!!!!!, change version failed. version=" << i);
LOG_ERRORreturn false;
}
}
return true;
}
其实游戏数据库,在数据库中存储二进制数据是很常见的,因为需求频繁改动。
不需要更改列就可以实现数据的删除与新增。更新数据时不需要再编写更新函数,也就是使用 protobuf 定义的结构作为存储数据结构。作为协议使用时,protobuf 方便的序列化特性被广泛使用。除此之外,它还有一个非常给力的特性就是兼容性,这个特性用于存储时也非常给力。
因为 protobuf 是 TLV,Tag Length Value,根据 Tag 判断字段 Length 判断字段数据类型长度 Value 为存储数据值。 在一个协议中删除一个字段(但是删去字段的 tag 不能再用)、或者新增字段(新增 tag 必须是新的),这样的 protobuf 是前后兼容的。
= "proto3";
syntax ;
package Protoenum Gender
{
= 0;
none = 1;
male = 2;
female }
message PlayerBase{
= 1;
Gender gender = 2;
int32 level }
message LastWorld{
= 1;
int32 world_id = 2;
int64 world_sn = 3;
Vector3 position }
message PlayerMisc{
= 1; // 公共地图
LastWorld last_world = 2; // 副本地图
LastWorld last_dungeon //...
}
message Player{
= 1;
uint64 sn = 2;
string name = 3;
PlayerBase base = 4;
PlayerMisc misc }
PlayerBase 可以直接用二进制字段存储。
在进程 login 中验证完成之后,需要向 dgmgr 进程发起查询角色的协议,如果返回数据没有角色,就由客户端发起创建角色的协议。
如果 dbmgr 与 login 是不同的进程应该怎样通信。 可以建立 NetworkConnector 组件解决,只需让乙方连接到对象的 NetworkListen 就行了,两者之间就可以发送通信协议,但是需要保证之间的通信安全,因为两个进程跑在不同的物理地点、不同的物理主机都有可能。
int main(int argc, char *argv[])
{
const APP_TYPE curAppType = APP_TYPE::APP_LOGIN;
(curAppType, argc, argv);
ServerApp app.Initialize();
app//...
auto pYaml = Yaml::GetInstance();
auto pCommonConfig = pYaml->GetIPEndPoint(curAppType);
->CreateComponent<NetworkListen>(pCommonConfig->Ip, pCommonConfig->Port);
pThreadMgr->CreateComponent<NetworkConnector>((int)APP_TYPE::APP_DB_MGR, 0);
pThreadMgr//...
}
void NetworkConnector::AwakeFromPool(int appType, int appId)
{
auto pYaml = Yaml::GetInstance();
auto pComponent = pYaml->GetIPEndPoint((APP_TYPE)appType, appId);
if (pComponent == nullptr)
{
("can't find yaml config.");
LOG_ERRORreturn;
}
(pComponent->Ip, pComponent->Port);
Connectauto pNetworkLocator = ThreadMgr::GetInstance()->GetComponent<NetworkLocator>();
->AddConnectorLocator(this, (APP_TYPE)appType, appId);
pNetworkLocator}
在初始化的同时,将这个网络类放到了 NetworkLocator 组件中,方便发送数据时的定位。这样,login 进程就与 dbmgr 进程有了一个网络连接。
如现在 Account 接收到第三方账号验证的返回后,则进行提交角色查询协议到 dbmgr
void Account::HandleAccountCheckToHttpRs(Packet *pPacket)
{
auto proto = pPacket->ParseToProto<Proto::AccountCheckToHttpRs>();
auto pPlayer = _playerMgr.GetPlayer(proto.account());
if (pPlayer == nullptr)
{
std::cout << "can't find player. account:" << proto.account().c_str() << std::endl;
return;
}
::AccountCheckRs protoResult;
Proto.set_return_code(proto.return_code());
protoResult(Proto::MsgId::C2L_AccountCheckRs, pPlayer->GetSocket(), protoResult);
SendPacket// 验证成功,向DB发起查询
if (proto.return_code() == Proto::AccountCheckReturnCode::ARC_OK)
{
::QueryPlayerList protoQuery;
Proto.set_account(pPlayer->GetAccount().c_str());
protoQuery(Proto::MsgId::L2DB_QueryPlayerList, protoQuery, APP_DB_MGR);
SendPacket}
}
则 IMessageSystem 则需要定义多种形式,以应对多种情况.
void IMessageSystem::SendPacket(const Proto::MsgId msgId, const SOCKET socket, google::protobuf::Message &proto)
{
const auto pPacket = CreatePacket(msgId, socket);
->SerializeToBuffer(proto);
pPacket(pPacket);
SendPacket}
void IMessageSystem::SendPacket(const Proto::MsgId msgId, google::protobuf::Message &proto, APP_TYPE appType, int appId)
{
auto packet = CreatePacket(msgId, 0);
->SerializeToBuffer(proto);
packet(packet, appType, appId);
SendPacket}
void IMessageSystem::SendPacket(Packet *packet, APP_TYPE appType, int appId)
{
auto pNetworkLocator = ThreadMgr::GetInstance()->GetComponent<NetworkLocator>();
auto pNetwork = pNetworkLocator->GetNetworkConnector(appType, appId);
if (pNetwork != nullptr)
{
->SetSocket(pNetwork->GetSocket());
packet->SendPacket(packet);
pNetworkreturn;
}
if ((Global::GetInstance()->GetCurAppType() & appType) != 0)
{
// 正好在当前进程中,直接转发
// 例如 curapptype == all 的时候
(packet);
DispatchPacket}
else
{
("can't find network. appType:" << AppTypeMgr::GetInstance()->GetAppName(appType).c_str() << " appId:" << appId);
LOG_ERROR}
}
void IMessageSystem::SendPacket(Packet *pPacket)
{
auto pNetworkLocator = ThreadMgr::GetInstance()->GetComponent<NetworkLocator>();
auto pNetwork = pNetworkLocator->GetNetworkConnector(pPacket->GetSocket());
if (pNetwork != nullptr)
{
->SendPacket(pPacket);
pNetworkreturn;
}
// all in one 时,找不到Network,就向所有线程发送协议
if (Global::GetInstance()->GetCurAppType() == APP_ALL && pPacket->GetSocket() == 0)
{
(pPacket);
DispatchPacketreturn;
}
// 最后试着向listen发送数据
= pNetworkLocator->GetListen(NetworkTcpListen);
pNetwork ->SendPacket(pPacket);
pNetwork}