Logo Search packages:      
Sourcecode: wesnoth-1.9 version File versions  Download package

network_manager_ana.cpp

Go to the documentation of this file.
/* $Id: network_manager_ana.cpp 46186 2010-09-01 21:12:38Z silene $ */

/**
 * @file
 * @brief Implementation file for network features using ana.
 *
 * Copyright (C) 2010 Guillermo Biset.
 *
 * Part of the Battle for Wesnoth Project http://www.wesnoth.org/
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY.
 *
 * See the COPYING file for more details.
 */

#include <iostream>

#include <boost/bind.hpp>
#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/filter/gzip.hpp>

#include "serialization/parser.hpp"

#include "network_manager_ana.hpp"
#include "serialization/binary_or_text.hpp"

#include "gettext.hpp"

// Begin ana_send_handler implementation ----------------------------------------------------------

00036 ana_send_handler::ana_send_handler( size_t calls ) :
    mutex_(),
    target_calls_( calls ),
    error_code_()
{
    if ( calls > 0 )
        mutex_.lock();
}

00045 ana_send_handler::~ana_send_handler()
{
    if ( target_calls_ > 0 )
        throw std::runtime_error("Handler wasn't called enough times.");
    mutex_.lock();
    mutex_.unlock();
}

00053 void ana_send_handler::handle_send(ana::error_code   error_code,
                                   ana::net_id       /*client*/,
                                   ana::operation_id /*op_id*/)
{
    error_code_ = error_code;

    if ( --target_calls_ == 0 )
        mutex_.unlock();
}

00063 void ana_send_handler::wait_completion()
{
    mutex_.lock();
    mutex_.unlock();
}

// Begin ana_handshake_finisher_handler implementation --------------------------------------------

ana_handshake_finisher_handler::ana_handshake_finisher_handler( ana::server*     server,
                                                                clients_manager* mgr)
    : server_( server ),
      manager_( mgr )
{
}

ana_handshake_finisher_handler::~ana_handshake_finisher_handler()
{
}

void ana_handshake_finisher_handler::handle_send(ana::error_code   ec,
                                                 ana::net_id       client,
                                                 ana::operation_id /*op_id*/)
{
    if ( ec )
        server_->disconnect( client );
    else
        manager_->connected( client );

    delete this;
}

// Begin ana_receive_handler implementation -------------------------------------------------------

00096 ana_receive_handler::ana_receive_handler( ana_component_set::iterator iterator ) :
    iterator_( iterator ),
    mutex_(),
    handler_mutex_(),
    timeout_called_mutex_(),
    error_code_(),
    receive_timer_( NULL ),
    finished_( false )
{
    mutex_.lock();
    timeout_called_mutex_.lock();
}

00109 ana_receive_handler::~ana_receive_handler()
{
    timeout_called_mutex_.lock();
    timeout_called_mutex_.unlock();
    handler_mutex_.lock();
    handler_mutex_.unlock();
}

00117 void ana_receive_handler::wait_completion(ana::detail::timed_sender* component, size_t timeout_ms )
{
    {
        boost::mutex::scoped_lock lock( handler_mutex_);
        if ( finished_ )
        {
            mutex_.unlock();
            timeout_called_mutex_.unlock();
        }
        else if ( timeout_ms > 0 )
        {
            receive_timer_ = component->create_timer();

            receive_timer_->wait( ana::time::milliseconds(timeout_ms),
                                boost::bind(&ana_receive_handler::handle_timeout,
                                            this,
                                            ana::timeout_error ) );
        }
    }

    mutex_.lock();
    mutex_.unlock();
}

00141 void ana_receive_handler::handle_receive(ana::error_code          error_c,
                                         ana::net_id              client,
                                         ana::read_buffer read_buffer)
{
    boost::mutex::scoped_lock lock( handler_mutex_);

    delete receive_timer_;
    receive_timer_ = NULL;

    (*iterator_)->add_buffer( read_buffer, client );

    error_code_ = error_c;

    if (! finished_ )
    {
        finished_ = true;
        mutex_.unlock();
    }
}

00161 void ana_receive_handler::handle_disconnect(ana::error_code error_c, ana::net_id)
{
    boost::mutex::scoped_lock lock( handler_mutex_);

    delete receive_timer_;
    receive_timer_ = NULL;

    if (! finished_ )
    {
        error_code_ = error_c;

        finished_ = true;
        mutex_.unlock();
    }
}

void ana_receive_handler::handle_timeout(ana::error_code error_code)
{
    boost::mutex::scoped_lock lock( handler_mutex_ );

    delete receive_timer_;
    receive_timer_ = NULL;

    if (! finished_ )
    {
        error_code_ = error_code;

        finished_ = true;
        mutex_.unlock();
    }

    timeout_called_mutex_.unlock();
}

// Begin ana_multiple_receive_handler implementation ----------------------------------------------

00197 ana_multiple_receive_handler::ana_multiple_receive_handler( ana_component_set& components ) :
    components_( components ),
    mutex_(),
    handler_mutex_(),
    timeout_called_mutex_(),
    error_code_(),
    buffer_(),
    wesnoth_id_(0),
    receive_timer_( NULL ),
    finished_( false )
{
    throw std::runtime_error("Multiple receive handler constructed");

    ana_component_set::iterator it;

    for (it = components_.begin(); it != components_.end(); ++it )
    {
        if ( (*it)->is_server() )
            (*it)->server()->set_listener_handler( this );
        else
            (*it)->client()->set_listener_handler( this );
    }

    mutex_.lock();
    timeout_called_mutex_.lock();
}

00224 ana_multiple_receive_handler::~ana_multiple_receive_handler()
{
    timeout_called_mutex_.lock();
    timeout_called_mutex_.unlock();
    handler_mutex_.lock();
    handler_mutex_.unlock();
}

00232 void ana_multiple_receive_handler::wait_completion(size_t timeout_ms )
{
    ana_component_set::iterator it;
    {
        boost::mutex::scoped_lock lock( handler_mutex_);

        it = components_.begin();

        ana::detail::timed_sender* component;

        if ( (*it)->is_server())
            component = (*it)->server();
        else
            component = (*it)->client();

        if ( finished_ )
        {
            mutex_.unlock();
            timeout_called_mutex_.unlock();
        }
        else if ( timeout_ms > 0 )
        {
            receive_timer_ = component->create_timer();

            receive_timer_->wait( ana::time::milliseconds(timeout_ms),
                                boost::bind(&ana_multiple_receive_handler::handle_timeout,
                                            this,
                                            ana::timeout_error ) );
        }
    }
    mutex_.lock();
    mutex_.unlock();
}

00266 void ana_multiple_receive_handler::handle_receive(ana::error_code          error_c,
                                                  ana::net_id              id,
                                                  ana::read_buffer read_buffer)
{
    boost::mutex::scoped_lock lock( handler_mutex_);

    delete receive_timer_;
    receive_timer_ = NULL;

    buffer_ = read_buffer;
    error_code_ = error_c;

    ana_component_set::iterator it;
    it = std::find_if( components_.begin(), components_.end(),
                    boost::bind(&ana_component::get_id, _1) == id );

    if ( it != components_.end())
        wesnoth_id_ = (*it)->get_wesnoth_id();
    else
        throw std::runtime_error("Wrong read.");


    if (! finished_ )
    {
        finished_ = true;
        mutex_.unlock();
    }
}

00295 void ana_multiple_receive_handler::handle_disconnect(ana::error_code error_c, ana::net_id)
{
    boost::mutex::scoped_lock lock( handler_mutex_);

    delete receive_timer_;
    receive_timer_ = NULL;

    error_code_ = error_c;
    if (! finished_ )
    {
        finished_ = true;
        mutex_.unlock();
    }
}

void ana_multiple_receive_handler::handle_timeout(ana::error_code error_code)
{
    boost::mutex::scoped_lock lock( handler_mutex_ );

    delete receive_timer_;
    receive_timer_ = NULL;

    if (! finished_ )
    {
        error_code_ = error_code;
        finished_ = true;
        mutex_.unlock();
    }

    timeout_called_mutex_.unlock();
}

// Begin ana_connect_handler implementation -------------------------------------------------------

00329 ana_connect_handler::ana_connect_handler( ) :
    mutex_( ),
    error_code_()
{
    mutex_.lock();
}

00336 ana_connect_handler::~ana_connect_handler()
{
    mutex_.lock();
    mutex_.unlock();
}

00342 const ana::error_code& ana_connect_handler::error() const
{
    return error_code_;
}

00347 void ana_connect_handler::handle_connect(ana::error_code error_code, ana::net_id /*client*/)
{
    error_code_ = error_code;
    mutex_.unlock();
}

00353 void ana_connect_handler::wait_completion()
{
    mutex_.lock();
    mutex_.unlock();
}

// Begin ana_component implementation -------------------------------------------------------------

00361 ana_component::ana_component( ) :
    base_( ana::server::create() ),
    is_server_( true ),
    id_( server()->id() ),
    wesnoth_id_( 0 ),
    mutex_(),
    condition_(),
    buffers_(),
    sender_ids_()
{
}

00373 ana_component::ana_component( const std::string& host, const std::string& port) :
    base_( ana::client::create(host,port) ),
    is_server_( false ),
    id_(  client()->id() ),
    wesnoth_id_( 0 ),       // will change to the received id after connection
    mutex_(),
    condition_(),
    buffers_(),
    sender_ids_()
{
}

ana_component::~ana_component( )
{
    if ( is_server() )
        delete server();
    else
        delete client();
}

00393 network::statistics ana_component::get_send_stats() const
{
    ana::stats_collector& stats = listener()->stats_collector();

    network::statistics result;

    result.current     = stats.current_packet_out_total();
    result.current_max = stats.current_packet_out_size();
    result.total       = stats.get_stats( ana::ACCUMULATED )->bytes_out();

    return result;
}

00406 network::statistics ana_component::get_receive_stats() const
{
    ana::stats_collector& stats = listener()->stats_collector();

    network::statistics result;

    result.current     = stats.current_packet_in_total();
    result.current_max = stats.current_packet_in_size();
    result.total       = stats.get_stats( ana::ACCUMULATED )->bytes_in();

    return result;
}

00419 ana::server* ana_component::server() const
{
    if( ! is_server_ )
        throw std::runtime_error("Component is not a server.");

    return boost::get<ana::server*>(base_);
}

00427 ana::client* ana_component::client() const
{
    if( is_server_ )
        throw std::runtime_error("Component is not a client.");

    return boost::get<ana::client*>(base_);
}

00435 ana::detail::listener* ana_component::listener() const
{
    if( is_server_ )
        return server();
    else
        return client();
}

00443 bool ana_component::is_server() const
{
    return is_server_;
}

00448 bool ana_component::is_client() const
{
    return ! is_server_;
}

00453 ana::net_id ana_component::get_id() const
{
    return id_;
}

network::connection ana_component::get_wesnoth_id() const
{
    return wesnoth_id_;
}

void ana_component::set_wesnoth_id( network::connection id )
{
    wesnoth_id_ = id;
}

00468 const ana::stats* ana_component::get_stats( ana::stat_type type ) const
{
    return listener()->get_stats( type );
}

00473 void ana_component::add_buffer(ana::read_buffer buffer, ana::net_id id)
{
    {
        boost::lock_guard<boost::mutex> lock(mutex_);
        buffers_.push( buffer );

        if ( is_server_ )
            sender_ids_.push( id );
    }
    condition_.notify_all();
}

00485 ana::read_buffer ana_component::wait_for_element()
{
    boost::unique_lock<boost::mutex> lock(mutex_);

    while(buffers_.empty())
        condition_.wait(lock);

    const ana::read_buffer buffer_ret = buffers_.front();

    buffers_.pop();

    return buffer_ret;
}

00499 network::connection ana_component::oldest_sender_id_still_pending()
{
    boost::unique_lock<boost::mutex> lock(mutex_);

    if ( sender_ids_.empty())
        throw std::runtime_error("No pending buffer.");

    const network::connection id = sender_ids_.front();

    sender_ids_.pop();

    return id;
}

// Begin clients_manager  implementation ----------------------------------------------------------

00515 clients_manager::clients_manager( ana::server* server) :
    server_( server ),
    ids_(),
    pending_ids_(),
    pending_handshakes_()
{
}

00523 size_t clients_manager::client_amount() const
{
    return ids_.size();
}

00528 void clients_manager::handle_connect(ana::error_code error, ana::net_id client)
{
    if (! error )
    {
        ids_.insert( client );
        pending_handshakes_.insert( client );
    }
}

void clients_manager::handle_disconnect(ana::error_code /*error*/, ana::net_id client)
{
    ids_.erase(client);
    pending_ids_.erase( network::connection( client ) );
}

void clients_manager::connected( ana::net_id id )
{
    pending_ids_.insert( network::connection( id ) );
}

void clients_manager::remove( ana::net_id id )
{
    ids_.erase( id );
    pending_ids_.erase( network::connection( id ) );
    pending_handshakes_.erase( id );
}


void clients_manager::handshaked( ana::net_id id )
{
    pending_handshakes_.erase( id );
}

bool clients_manager::has_connection_pending() const
{
    return ! pending_ids_.empty();
}

bool clients_manager::is_pending_handshake( ana::net_id id ) const
{
    return pending_handshakes_.find( id ) != pending_handshakes_.end();
}

bool clients_manager::is_a_client( ana::net_id id ) const
{
    return ids_.find( id ) != ids_.end();
}

network::connection clients_manager::get_pending_connection_id()
{
    const network::connection result = *pending_ids_.begin();
    pending_ids_.erase( pending_ids_.begin() );
    return result;
}

// Begin ana_network_manager implementation -------------------------------------------------------

00585 ana_network_manager::ana_network_manager() :
    connect_timer_( NULL ),
    components_(),
    server_manager_(),
    disconnected_components_(),
    disconnected_ids_()
{
}

00594 ana::net_id ana_network_manager::create_server( )
{
    ana_component* new_component = new ana_component( );
    components_.insert( new_component );

    ana::server* server = new_component->server();

    clients_manager* manager = new clients_manager( server );
    server_manager_[ server ] = manager;

    server->set_connection_handler( manager );
    server->set_listener_handler( this );
    server->set_raw_data_mode();

    return server->id();
}

00611 network::connection ana_network_manager::create_client_and_connect(std::string host, int port)
{
    ana::net_id new_client_id = ana::invalid_net_id;

    try
    {
        std::stringstream ss;
        ss << port;

        ana_component* new_component = new ana_component( host, ss.str() );
        components_.insert( new_component );

        ana::client* const client = new_component->client();

        new_client_id = client->id();

        ana_connect_handler handler;

        client->set_raw_data_mode();
        client->set_connect_timeout( ana::time::seconds(10) );
        client->connect( &handler );
        client->set_listener_handler( this );
        client->run();

        handler.wait_completion(); // just wait for handler to finish

        if( handler.error() )
        {
            network::disconnect( client->id() );
            throw network::error(_("Could not connect to host"), client->id() );
        }
        else
        {
            //Send handshake
            ana::serializer::bostream bos;

            ana::ana_uint32 handshake( 0 );
            bos << handshake;

            ana_send_handler send_handler;

            client->send( ana::buffer( bos.str()), &send_handler); //, ana::ZERO_COPY );

            send_handler.wait_completion();

            if ( send_handler.error() )
                throw network::error(_("Could not connect to host"), client->id() );
            else
            {
                ana::ana_uint32 my_id;
                ana::serializer::bistream bis;

                client->wait_raw_object(bis, sizeof(ana::ana_uint32) );

                bis >> my_id;
                ana::network_to_host_long( my_id );

                new_component->set_wesnoth_id( my_id );

                client->set_header_first_mode();
                client->run_listener();

                return network::connection( client->id() );
            }
        }
    }
    catch( const std::exception& e )
    {
        throw network::error(_("Could not connect to host"), new_client_id );
        return 0;
    }
}

network::connection ana_network_manager::new_connection_id( )
{
    ana_component_set::iterator it;

    for (it = components_.begin(); it != components_.end(); ++it)
    {
        if ( (*it)->is_server() )
        {
            clients_manager* clients_mgr = server_manager_[ (*it)->server() ];
            if ( clients_mgr->has_connection_pending() )
                return clients_mgr->get_pending_connection_id();
        }
    }

    // No new connection
    return 0;
}


00703 const ana::stats* ana_network_manager::get_stats( network::connection connection_num,
                                                  ana::stat_type      type)
{
    ana::net_id id( connection_num );
    std::set<ana_component*>::iterator it;

    if ( id == 0 )
    {
        if ( ! components_.empty() )
        {
            it = components_.begin();
            return (*it)->get_stats( type );
        }
        else
            return NULL;
    }
    else
    {
        it = std::find_if( components_.begin(), components_.end(),
                           boost::bind(std::logical_or<bool>(),
                           (boost::bind(&ana_component::get_wesnoth_id, _1) == connection_num),
                           (boost::bind(&ana_component::get_id, _1) == id ) ));
        //Make a broad attempt at finding it, test for both ANA's id and the assigned one.

        if ( it != components_.end())
            return (*it)->get_stats( type );
        else
        {
            for ( it = components_.begin() ; it != components_.end(); ++it)
            {
                if ( (*it)->is_server() )
                {
                    const ana::stats* res = (*it)->server()->get_client_stats(id,ana::ACCUMULATED);
                    if ( res != NULL )
                        return res;
                }
            }
        }

        return NULL;
    }
}

00746 void ana_network_manager::close_connections_and_cleanup()
{
    for (ana_component_set::iterator it = components_.begin(); it != components_.end(); ++it)
        delete *it;

    std::map< ana::server*, clients_manager* >::iterator it;
    for ( it = server_manager_.begin(); it != server_manager_.end(); ++it)
        delete it->second;

    components_.clear();
    server_manager_.clear();
}

00759 void ana_network_manager::throw_if_pending_disconnection()
{
    if ( ! disconnected_components_.empty() )
    {
        ana_component* component = disconnected_components_.front();
        disconnected_components_.pop();

        const ana::net_id id = component->get_id(); // Should I use wesnoth_id here?

        delete component;

        throw network::error(_("Client disconnected"),id);
    }

    if ( ! disconnected_ids_.empty() )
    {
        ana::net_id id = disconnected_ids_.front();
        disconnected_ids_.pop();
        throw network::error(_("Client disconnected"),id);
    }
}

00781 void ana_network_manager::run_server(ana::net_id id, int port)
{
    std::stringstream ss;
    ss << port;

    ana_component_set::iterator it;

    it = std::find_if( components_.begin(), components_.end(),
                        boost::bind(&ana_component::get_id, _1) == id );

    if ( it == components_.end())
        throw std::runtime_error("No server with this id.");
    else
        if ( (*it)->is_server() )
            (*it)->server()->run( ss.str() );
        else
            throw std::runtime_error("Component is not a server.");

}

00801 std::string ana_network_manager::ip_address( network::connection id )
{
    std::set<ana_component*>::iterator it;

    for (it = components_.begin(); it != components_.end(); ++it)
    {
        if ( (*it)->is_server() )
        {
            const std::string ip = (*it)->server()->ip_address( ana::net_id( id ) );
            if  (ip != "")
                return ip;
        }
    }
    return "";
}

00817 size_t ana_network_manager::number_of_connections() const
{
    // TODO:check if this is the intention

    size_t total(0);

    ana_component_set::const_iterator it;

    for (it = components_.begin(); it != components_.end(); ++it )
    {
        if ((*it)->is_client())
            ++total;
        else
        {
            std::map< ana::server*, clients_manager* >::const_iterator mgr = server_manager_.find(
(*it)->server() );
            total += mgr->second->client_amount();
        }
    }

    return total;
}


std::string ana_network_manager::compress_config( const config& cfg )
{
    std::ostringstream out;
    compress_config( cfg, out );
    return out.str( );
}


00849 void ana_network_manager::compress_config( const config& cfg, std::ostringstream& out)
{
    boost::iostreams::filtering_stream<boost::iostreams::output> filter;
    filter.push(boost::iostreams::gzip_compressor());
    filter.push(out);
    write(filter, cfg);
    out.flush();
}


00859 void ana_network_manager::read_config( const ana::read_buffer& buffer, config& cfg)
{
    std::istringstream input( buffer->string() );

    read_gz(cfg, input);
}

00866 size_t ana_network_manager::send_all( const config& cfg, bool /*zipped*/ )
{
    const std::string output_string = compress_config(cfg);

    std::set<ana_component*>::iterator it;

    for (it = components_.begin(); it != components_.end(); ++it)
    {
        if ( (*it)->is_server() )
        {
            const size_t necessary_calls = server_manager_[ (*it)->server() ]->client_amount();
            ana_send_handler handler( necessary_calls );

            (*it)->server()->send_all( ana::buffer( output_string ), &handler); //, ana::ZERO_COPY);
            handler.wait_completion(); // the handler will release the mutex after
                                       // necessary_calls calls
        }
        else
        {
            ana_send_handler handler;

            (*it)->client()->send( ana::buffer( output_string ), &handler); //, ana::ZERO_COPY );
            handler.wait_completion();
        }
    }
    return output_string.size();
}

00894 size_t ana_network_manager::send( network::connection connection_num ,
                                  const config&       cfg,
                                  bool                zipped )
{
    if ( ! zipped )
        throw std::runtime_error("All send operations should be zipped");

    const std::string output_string = compress_config(cfg);


    return send_raw_data( output_string.c_str(), output_string.size(), connection_num );
}

size_t ana_network_manager::send_raw_data( const char*         base_char,
                                           size_t              size,
                                           network::connection connection_num )
{
    ana::net_id id( connection_num );
    ana_component_set::iterator it;

    it = std::find_if( components_.begin(), components_.end(),
                   boost::bind(std::logical_or<bool>(),
                       (boost::bind(&ana_component::get_wesnoth_id, _1) == connection_num),
                       (boost::bind(&ana_component::get_id, _1) == id ) ));
    //Make a broad attempt at finding it, test for both ANA's id and the assigned one.

    if ( it != components_.end())
    {
        if ( (*it)->is_server() )
            throw std::runtime_error("Can't send to the server itself.");

        ana_send_handler handler;
        (*it)->client()->send( ana::buffer( base_char, size ), &handler); //, ana::ZERO_COPY);
        handler.wait_completion();

        if ( handler.error() )
            return 0;
        else
            return size;
    }
    else
    {
        if ( components_.size() == 0 )
            return 0;
        else
        {
            it = components_.begin();

            if ((*it)->is_server())
            {
                ana_send_handler handler;                   //, ana::ZERO_COPY);
                (*it)->server()->send_one( id, ana::buffer( base_char, size ), &handler);
                handler.wait_completion();
                if ( handler.error() )
                    return 0;
                else
                    return size;
            }
            else
            {
                ana_send_handler handler;
                (*it)->client()->send( ana::buffer( base_char, size ), &handler);
                handler.wait_completion();

                if ( handler.error() )
                    return 0;
                else
                    return size;
            }
        }
    }
}

void ana_network_manager::send_all_except(const config& cfg, network::connection connection_num)
{
    const std::string output_string = compress_config(cfg);


    ana_component_set::iterator it;

    ana::net_id id_to_avoid( connection_num ); //I should have issued this id earlier

    for ( it = components_.begin(); it != components_.end(); ++it)
    {
        if ((*it)->is_server())
        {
            if ( (*it)->get_id() != id_to_avoid )
            {
                if ( server_manager_[ (*it)->server() ]->is_a_client( id_to_avoid ) )
                {
                    const size_t clients_receiving_number
                                = server_manager_[ (*it)->server() ]->client_amount() - 1;

                    ana_send_handler handler( clients_receiving_number );
                    (*it)->server()->send_all_except( id_to_avoid,
                                                      ana::buffer( output_string ),
                                                      &handler); //, ana::ZERO_COPY);
                    handler.wait_completion();
                }
            }
        }
        else
        {
            if ( (*it)->get_wesnoth_id() != connection_num )
            {
                ana_send_handler handler;
                (*it)->client()->send( ana::buffer( output_string ), &handler); //, ana::ZERO_COPY);
                handler.wait_completion();
            }
        }
    }
}

01007 network::connection ana_network_manager::read_from_ready_buffer(
                                         const ana_component_set::iterator& it, config& cfg)
{
    read_config( (*it)->wait_for_element(), cfg);

    return (*it)->get_wesnoth_id();
}

01015 network::connection ana_network_manager::read_from( const ana_component_set::iterator& it,
                                                    config&             cfg,
                                                    size_t              timeout_ms)
{
    if (  (*it)->new_buffer_ready() )
        return read_from_ready_buffer( it, cfg );
    else if (timeout_ms == 0 )
        return 0;
    else
    {
        ana_receive_handler handler(it);
        (*it)->listener()->set_listener_handler( &handler );

        if (  (*it)->new_buffer_ready() )
            return read_from_ready_buffer( it, cfg );
        else
        {
            if ( (*it)->is_server() )
                handler.wait_completion( (*it)->server(), timeout_ms );
            else
                handler.wait_completion( (*it)->client(), timeout_ms );
        }

        (*it)->listener()->set_listener_handler( this );

        if ( handler.error() )
            return 0;
        else
            return read_from_ready_buffer( it, cfg );
    }
}

01047 network::connection ana_network_manager::read_from( network::connection connection_num,
                                                    config&             cfg,
                                                    size_t              timeout_ms)
{
    if ( components_.empty() )
        return 0;

    ana_component_set::iterator it;

    if ( connection_num == 0 )
    {
        if ( components_.size() == 1 )
            return read_from( components_.begin(), cfg, timeout_ms );
        else
        {
            //Check first if there is an available buffer
            for (it = components_.begin(); it != components_.end(); ++it)
                if (  (*it)->new_buffer_ready() )
                    return read_from_ready_buffer( it, cfg );

            // If no timeout was requested, return
            if (timeout_ms == 0 )
                return 0;

            // Wait timeout_ms milliseconds to see if any component will receive something
            ana_multiple_receive_handler handler( components_ );

            for (it = components_.begin(); it != components_.end(); ++it )
                (*it)->listener()->set_listener_handler( &handler );

            handler.wait_completion( timeout_ms );

            for (it = components_.begin(); it != components_.end(); ++it )
                (*it)->listener()->set_listener_handler( this );

            if ( handler.error() )
            {
                for (it = components_.begin(); it != components_.end(); ++it)
                    if (  (*it)->new_buffer_ready() )
                        return read_from_ready_buffer( it, cfg );

                // So nothing was read:
                return 0;
            }
            else
            {
                read_config( handler.buffer(), cfg);
                return handler.get_wesnoth_id();
            }
        }
    }
    else
    {
        ana::net_id id( connection_num );

        it = std::find_if( components_.begin(), components_.end(),
                    boost::bind(std::logical_or<bool>(),
                        (boost::bind(&ana_component::get_wesnoth_id, _1) == connection_num),
                        (boost::bind(&ana_component::get_id, _1) == id ) ) );
        //Make a broad attempt at finding it, test for both ANA's id and the assigned one.


        if ( it != components_.end())
            return read_from(it, cfg, timeout_ms);
        else
            throw std::runtime_error("Trying a network read from an invalid component id.");
    }
}

network::connection ana_network_manager::read_from_all( std::vector<char>& vec)
{
    ana_component_set::iterator it;

    if ( components_.empty() )
        return 0;

    for (it = components_.begin(); it != components_.end(); ++it)
    {
        if (  (*it)->new_buffer_ready() )
        {
            ana::read_buffer buffer = (*it)->wait_for_element();

            char* ch = buffer->base_char();
            for (size_t i = 0; i < buffer->size(); ++i)  // copy the buffer
                vec.push_back( *(ch++) );

            if ( (*it)->is_client() )
                return (*it)->get_wesnoth_id();
            else
                return (*it)->oldest_sender_id_still_pending();
        }
    }

    // there wasn't any buffer ready
    return 0;
}

01144 network::statistics ana_network_manager::get_send_stats(network::connection handle)
{
    if ( handle != 0 )
    {
//         ana::net_id id( handle );
        std::set< ana_component* >::iterator it;

        it = std::find_if( components_.begin(), components_.end(),
                            boost::bind(&ana_component::get_wesnoth_id, _1) == handle );

        if ( it != components_.end() )
            return (*it)->get_send_stats( );
        else
            throw std::runtime_error("Trying to get stats from the wrong component.");
    }
    else if( ! components_.empty() )
    {
        std::set< ana_component* >::iterator it = components_.begin();
        return (*it)->get_send_stats();
    }
    else
        return network::statistics();
}

01168 network::statistics ana_network_manager::get_receive_stats(network::connection handle)
{
    if ( handle != 0 )
    {
        ana::net_id id( handle );
        std::set< ana_component* >::iterator it;

        it = std::find_if( components_.begin(), components_.end(),
                            boost::bind(&ana_component::get_id, _1) == id );

        if ( it != components_.end() )
            return (*it)->get_receive_stats( );
        else
            throw std::runtime_error("Received message from a non connected component.");
    }
    else if( ! components_.empty() )
    {
        std::set< ana_component* >::iterator it = components_.begin();
        return (*it)->get_receive_stats();
    }
    else
        return network::statistics();
}

01192 bool ana_network_manager::disconnect( network::connection handle)
{
    if ( handle == 0 )
        close_connections_and_cleanup();
    else
    {
        ana::net_id id( handle );
        ana_component_set::iterator it;

        it = std::find_if( components_.begin(), components_.end(),
                       boost::bind(std::logical_or<bool>(),
                           (boost::bind(&ana_component::get_wesnoth_id, _1) == handle),
                           (boost::bind(&ana_component::get_id, _1) == id ) ));
        //Make a broad attempt at finding it, test for both ANA's id and the assigned one.

        if ( it == components_.end())
            throw std::runtime_error("Trying to disconnect an invalid component.");
        else
        {
            if ( (*it)->is_server() )
                throw std::runtime_error("Can't disconnect server directly.");
            else
                (*it)->client()->disconnect();
        }
    }
    return true;
}

01220 void ana_network_manager::handle_send(ana::error_code error_code,
                                      ana::net_id client,
                                      ana::operation_id /*op_id*/)
{
    if ( error_code )
        network::disconnect( client );
}

01228 void ana_network_manager::handle_receive( ana::error_code          error,
                                          ana::net_id              client,
                                          ana::read_buffer buffer)
{
    if (error)
        network::disconnect( client );
    else
    {
        std::set< ana_component* >::iterator it;

        it = std::find_if( components_.begin(), components_.end(),
                            boost::bind(&ana_component::get_id, _1) == client );

        if ( it != components_.end() )
            (*it)->add_buffer( buffer, client );
        else
        {
            if (components_.empty() )
                throw std::runtime_error("Received a message while no component was running.\n");

            std::map< ana::server*, clients_manager* >::iterator mgrs;

            for ( mgrs = server_manager_.begin(); mgrs != server_manager_.end(); ++mgrs)
            {
                if (mgrs->second->is_a_client( client ) ) // Is this your client?
                {
                    if ( mgrs->second->is_pending_handshake( client ) ) // Did he login already?
                    {
                        // all handshakes are 4 bytes long
                        if ( buffer->size() != sizeof(ana::ana_uint32) )
                            mgrs->first->disconnect( client );

                        ana::ana_uint32 handshake;
                        {
                            ana::serializer::bistream bis( buffer->string() );

                            bis >> handshake;
                            ana::network_to_host_long( handshake ); //I'm expecting a 0 anyway
                        }

                        if ( handshake != 0 )
                            mgrs->first->disconnect( client );
                        else
                        {
                            mgrs->second->handshaked( client );
                            //send back it's id
                            ana::serializer::bostream bos;
                            ana::ana_uint32 network_byte_order_id = client;
                            ana::host_to_network_long( network_byte_order_id );
                            bos << network_byte_order_id;

                            ana_handshake_finisher_handler* handler
                                = new ana_handshake_finisher_handler( mgrs->first,
                                                                      mgrs->second);

                            mgrs->first->send_one(client, ana::buffer( bos.str() ), handler );
                            mgrs->first->set_header_first_mode( client );
                        }
                    }
                    else // just add the buffer to the associated clients_manager
                    {
                        ana::net_id server_id = mgrs->first->id();

                        it = std::find_if( components_.begin(), components_.end(),
                                        boost::bind(&ana_component::get_id, _1) == server_id );

                        if ( (*it)->is_client() )
                            throw std::runtime_error("Wrong id to receive from.");

                        (*it)->add_buffer( buffer, client );
                    }
                }
            }
        }
    }
}

01305 bool ana_component::new_buffer_ready() // non const due to mutex block
{
    boost::mutex::scoped_lock lock( mutex_ );

    return ! buffers_.empty();
}


01313 void ana_network_manager::handle_disconnect(ana::error_code /*error_code*/, ana::net_id client)
{
    std::set< ana_component* >::iterator it;

    it = std::find_if( components_.begin(), components_.end(),
                        boost::bind(&ana_component::get_id, _1) == client );

    if ( it != components_.end() )
    {
        disconnected_components_.push( *it );
        components_.erase(it);
    }
    else
    {
        for (it = components_.begin(); it != components_.end(); it++ )
            if ( (*it)->is_server() )
                if ( server_manager_[ (*it)->server() ]->is_a_client( client ) )
                {
                    server_manager_[ (*it)->server() ]->remove( client );
                    disconnected_ids_.push( client );
                }
    }
}

Generated by  Doxygen 1.6.0   Back to index