🐞 fix(ftpserver): 数据传输完毕及时释放端点

This commit is contained in:
huheng@kylinos.cn 2022-12-08 14:54:15 +08:00
parent b3f3b7f49d
commit 52d01916b6
3 changed files with 124 additions and 106 deletions

View File

@ -62,6 +62,11 @@ asio::ip::tcp::socket &FtpSession::getSocket()
return command_socket_;
}
asio::ip::tcp::acceptor &FtpSession::getAcceptor()
{
return data_acceptor_;
}
void FtpSession::sendFtpMessage(const FtpMessage &message)
{
sendRawFtpMessage(message.str());
@ -322,6 +327,7 @@ void FtpSession::handleFtpCommandPASV(const std::string & /*param*/)
}
if (data_acceptor_.is_open()) {
std::cout << "acceptor is open" << std::endl;
asio::error_code ec;
data_acceptor_.close(ec);
if (ec) {
@ -343,7 +349,13 @@ void FtpSession::handleFtpCommandPASV(const std::string & /*param*/)
return;
}
}
data_acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(true));
asio::error_code ec;
data_acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(true), ec);
if (ec) {
std::cerr << "Error setting reuse_address option: " << ec.message() << std::endl;
sendFtpMessage(FtpReplyCode::SERVICE_NOT_AVAILABLE, "Failed to enter passive mode.");
return;
}
{
asio::error_code ec;
data_acceptor_.bind(endpoint);
@ -1115,6 +1127,14 @@ void FtpSession::writeDataToSocket(std::shared_ptr<asio::ip::tcp::socket> data_s
me->sendFtpMessage(FtpReplyCode::CLOSING_DATA_CONNECTION, "Done");
s_freePorts.push(port);
data_socket->close();
if (me->getAcceptor().is_open()) {
std::cout << "acceptor is open" << std::endl;
asio::error_code ec;
me->getAcceptor().close(ec);
if (ec) {
std::cerr << "Error closing data acceptor: " << ec.message() << std::endl;
}
}
// delete data_socket;
}
});

View File

@ -48,6 +48,8 @@ public:
asio::ip::tcp::socket &getSocket();
asio::ip::tcp::acceptor &getAcceptor();
////////////////////////////////////////////////////////
// FTP command-socket
////////////////////////////////////////////////////////

View File

@ -8,149 +8,145 @@
namespace fineftp
{
FtpServerImpl::FtpServerImpl(const std::string& address, uint16_t port)
: port_ (port)
, address_ (address)
, acceptor_ (io_service_)
, open_connection_count_(0)
{}
FtpServerImpl::FtpServerImpl(const std::string &address, uint16_t port)
: port_(port), address_(address), acceptor_(io_service_), open_connection_count_(0)
{}
FtpServerImpl::~FtpServerImpl()
{
FtpServerImpl::~FtpServerImpl()
{
stop();
}
}
bool FtpServerImpl::addUser(const std::string& username, const std::string& password, const std::string& local_root_path, const Permission permissions)
{
bool FtpServerImpl::addUser(const std::string &username, const std::string &password,
const std::string &local_root_path, const Permission permissions)
{
return ftp_users_.addUser(username, password, local_root_path, permissions);
}
}
bool FtpServerImpl::addUserAnonymous(const std::string& local_root_path, const Permission permissions)
{
bool FtpServerImpl::addUserAnonymous(const std::string &local_root_path, const Permission permissions)
{
return ftp_users_.addUser("anonymous", "", local_root_path, permissions);
}
}
bool FtpServerImpl::start(size_t thread_count)
{
auto ftp_session = std::make_shared<FtpSession>(io_service_, ftp_users_, [this]() { open_connection_count_--; });
bool FtpServerImpl::start(size_t thread_count)
{
auto ftp_session = std::make_shared<FtpSession>(io_service_, ftp_users_, [this]() {
open_connection_count_--;
});
// set up the acceptor to listen on the tcp port
asio::error_code make_address_ec;
asio::ip::tcp::endpoint endpoint(asio::ip::make_address(address_, make_address_ec), port_);
if (make_address_ec)
{
std::cerr << "Error creating address from string \"" << address_<< "\": " << make_address_ec.message() << std::endl;
return false;
}
{
asio::error_code ec;
acceptor_.open(endpoint.protocol(), ec);
if (ec)
{
std::cerr << "Error opening acceptor: " << ec.message() << std::endl;
if (make_address_ec) {
std::cerr << "Error creating address from string \"" << address_ << "\": " << make_address_ec.message()
<< std::endl;
return false;
}
}
{
asio::error_code ec;
acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(true), ec);
if (ec)
{
std::cerr << "Error setting reuse_address option: " << ec.message() << std::endl;
return false;
}
asio::error_code ec;
acceptor_.open(endpoint.protocol(), ec);
if (ec) {
std::cerr << "Error opening acceptor: " << ec.message() << std::endl;
return false;
}
}
{
asio::error_code ec;
acceptor_.bind(endpoint, ec);
if (ec)
{
std::cerr << "Error binding acceptor: " << ec.message() << std::endl;
return false;
}
asio::error_code ec;
acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(true), ec);
if (ec) {
std::cerr << "Error setting reuse_address option: " << ec.message() << std::endl;
return false;
}
}
{
asio::error_code ec;
acceptor_.listen(asio::socket_base::max_listen_connections, ec);
if (ec)
{
std::cerr << "Error listening on acceptor: " << ec.message() << std::endl;
return false;
}
asio::error_code ec;
acceptor_.bind(endpoint, ec);
if (ec) {
std::cerr << "Error binding acceptor: " << ec.message() << std::endl;
return false;
}
}
{
asio::error_code ec;
acceptor_.listen(asio::socket_base::max_listen_connections, ec);
if (ec) {
std::cerr << "Error listening on acceptor: " << ec.message() << std::endl;
return false;
}
}
#ifndef NDEBUG
std::cout << "FTP Server created." << std::endl << "Listening at address " << acceptor_.local_endpoint().address() << " on port " << acceptor_.local_endpoint().port() << ":" << std::endl;
std::cout << "FTP Server created." << std::endl
<< "Listening at address " << acceptor_.local_endpoint().address() << " on port "
<< acceptor_.local_endpoint().port() << ":" << std::endl;
#endif // NDEBUG
acceptor_.async_accept(ftp_session->getSocket()
, [=](auto ec)
{
open_connection_count_++;
acceptor_.async_accept(ftp_session->getSocket(), [=](auto ec) {
open_connection_count_++;
this->acceptFtpSession(ftp_session, ec);
});
this->acceptFtpSession(ftp_session, ec);
});
for (size_t i = 0; i < thread_count; i++)
{
thread_pool_.emplace_back([=] {io_service_.run(); });
for (size_t i = 0; i < thread_count; i++) {
thread_pool_.emplace_back([=] {
io_service_.run();
});
}
return true;
}
void FtpServerImpl::stop()
{
return true;
}
void FtpServerImpl::stop()
{
io_service_.stop();
for (std::thread& thread : thread_pool_)
{
thread.join();
for (std::thread &thread : thread_pool_) {
thread.join();
}
thread_pool_.clear();
}
}
void FtpServerImpl::acceptFtpSession(std::shared_ptr<FtpSession> ftp_session, asio::error_code const& error)
{
if (error)
{
void FtpServerImpl::acceptFtpSession(std::shared_ptr<FtpSession> ftp_session, asio::error_code const &error)
{
if (error) {
#ifndef NDEBUG
std::cerr << "Error handling connection: " << error.message() << std::endl;
std::cerr << "Error handling connection: " << error.message() << std::endl;
#endif
return;
return;
}
#ifndef NDEBUG
std::cout << "FTP Client connected: " << ftp_session->getSocket().remote_endpoint().address().to_string() << ":" << ftp_session->getSocket().remote_endpoint().port() << std::endl;
std::cout << "FTP Client connected: " << ftp_session->getSocket().remote_endpoint().address().to_string() << ":"
<< ftp_session->getSocket().remote_endpoint().port() << std::endl;
#endif
ftp_session->start();
auto new_session = std::make_shared<FtpSession>(io_service_, ftp_users_, [this]() { open_connection_count_--; });
auto new_session = std::make_shared<FtpSession>(io_service_, ftp_users_, [this]() {
open_connection_count_--;
});
acceptor_.async_accept(new_session->getSocket()
, [=](auto ec)
{
open_connection_count_++;
this->acceptFtpSession(new_session, ec);
});
}
int FtpServerImpl::getOpenConnectionCount()
{
return open_connection_count_;
}
uint16_t FtpServerImpl::getPort()
{
return acceptor_.local_endpoint().port();
}
std::string FtpServerImpl::getAddress()
{
return acceptor_.local_endpoint().address().to_string();
}
acceptor_.async_accept(new_session->getSocket(), [=](auto ec) {
open_connection_count_++;
this->acceptFtpSession(new_session, ec);
});
}
int FtpServerImpl::getOpenConnectionCount()
{
return open_connection_count_;
}
uint16_t FtpServerImpl::getPort()
{
return acceptor_.local_endpoint().port();
}
std::string FtpServerImpl::getAddress()
{
return acceptor_.local_endpoint().address().to_string();
}
} // namespace fineftp