aesop-srv.cpp

Go to the documentation of this file.
00001 /*
00002  * aesop-srv.cpp
00003  *
00004  * Copyright (C) 2007-2010  Thomas A. Vaughan
00005  * All rights reserved.
00006  *
00007  *
00008  * Redistribution and use in source and binary forms, with or without
00009  * modification, are permitted provided that the following conditions are met:
00010  *     * Redistributions of source code must retain the above copyright
00011  *       notice, this list of conditions and the following disclaimer.
00012  *     * Redistributions in binary form must reproduce the above copyright
00013  *       notice, this list of conditions and the following disclaimer in the
00014  *       documentation and/or other materials provided with the distribution.
00015  *     * Neither the name of the <organization> nor the
00016  *       names of its contributors may be used to endorse or promote products
00017  *       derived from this software without specific prior written permission.
00018  *
00019  * THIS SOFTWARE IS PROVIDED BY THOMAS A. VAUGHAN ''AS IS'' AND ANY
00020  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00021  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00022  * DISCLAIMED. IN NO EVENT SHALL THOMAS A. VAUGHAN BE LIABLE FOR ANY
00023  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00024  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00025  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
00026  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00027  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00028  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00029  *
00030  *
00031  * Implementation of the basic aesop server.  See aesop-srv.h
00032  *
00033  * WARNING: try not to put anything interesting in here.  The Server
00034  * object is mostly just a simple coordinator.  Any rich state etc. should
00035  * be handled by other objects.  Some objects may also need to run on the
00036  * client, for example.
00037  */
00038 
00039 // includes -------------------------------------------------------------------
00040 #include "aesop-srv.h"          // always include our own header first!
00041 
00042 #include <math.h>
00043 
00044 //#include "srv-connection-mgr.h"       // connection management
00045 #include "srv-converse.h"       // conversations we manage
00046 #include "srv-hosts.h"          // host management
00047 #include "srv-msg-router.h"     // message router
00048 #include "srv-physics.h"        // physics messages
00049 #include "srv-players.h"        // player management
00050 #include "srv-users.h"          // user management
00051 
00052 #include "common/wave_ex.h"
00053 #include "conversation/conversation.h"  // conversation manager
00054 #include "aesop-proto/message.h"
00055 #include "aesop-proto/protocol.h"
00056 #include "crypto/crypto.h"
00057 #include "datahash/datahash_util.h"
00058 #include "map-dynamics/map-dynamics.h"
00059 #include "map-manager/map-manager.h"
00060 #include "netrq/netrq.h"                // network request queue
00061 #include "perf/perf.h"
00062 #include "story/story.h"
00063 #include "threadpool/threadpool.h"
00064 #include "threadsafe/threadsafe_counter.h"
00065 #include "threadsafe/threadsafe_map.h"
00066 #include "threadsafe/threadsafe_queue.h"
00067 #include "xdrbuf/xdrbuf.h"
00068 
00069 
00070 namespace aesop {
00071 
00072 
00073 // public interface destructor implementation
00074 Server::~Server(void) throw() { }
00075 
00076 
00077 typedef threadsafe_map<int, std::string> name_map_t;
00078 
00079 
00080 // interface between handlers and thread pool
00081 class handler_intercept_t {
00082 public:
00083         // constructor, manipulators
00084         handler_intercept_t(IN const threadsafe_refcount& in_trc) throw() :
00085                 trc(in_trc) {
00086                         fn = NULL;
00087                         state.clear();
00088                         map = NULL;
00089                 }
00090 
00091         // public data fields
00092         msg_handler_fn          fn;
00093         handler_state_t         state;
00094         name_map_t *            map;
00095 
00096 private:
00097         // private data fields
00098         threadsafe_refcount     trc;    // server keeps track of worker counts
00099 };
00100 
00101 
00102 
00103 typedef std::vector<conn_id_t> vec_conn_t;
00104 
00105 
00106 // should be long enough to hold most objects and requests, but not insane...
00107 static const int s_maxNetQueueLength            = 256;
00108 
00109 // max size of a broadcast message (keep small!)
00110 static const int s_bcastBuffer                  = 128;
00111 
00112 
00113 ////////////////////////////////////////////////////////////////////////////////
00114 //
00115 //      static helper methods
00116 //
00117 ////////////////////////////////////////////////////////////////////////////////
00118 
00119 static void
00120 interceptHandler
00121 (
00122 IN void * context
00123 )
00124 {
00125         perf::Timer timer("aesop_srv::interceptHandler");
00126         ASSERT(context, "null");
00127         handler_intercept_t * ctx = (handler_intercept_t *) context;
00128         ASSERT(ctx, "null");
00129 
00130         // wrap context in a smart pointer so it cleans up on exit
00131         smart_ptr<handler_intercept_t> smart_hi = ctx;
00132 
00133         ASSERT(ctx->fn, "null handler fn?");
00134         ASSERT(ctx->state.story, "null story?");
00135         ASSERT(ctx->state.updates, "null updates interface");
00136 
00137         ctx->fn(ctx->state);
00138 }
00139 
00140 
00141 
00142 ////////////////////////////////////////////////////////////////////////////////
00143 //
00144 //      Objects to construct network messages (used for network request queue).
00145 //
00146 ////////////////////////////////////////////////////////////////////////////////
00147 
00148 class QueryContext : public netrq::Request {
00149 public:
00150         QueryContext(IN dword_t objectId) throw() {
00151                         sprintf(m_id, "qr%ld", (long) objectId);
00152                         m_objectId = objectId;
00153                 }
00154         ~QueryContext(void) throw() { }
00155 
00156         void write(IO xdrbuf::Output * output) {
00157                         // get this physics object
00158                         smart_ptr<PhysicsObject> obj =
00159                             getPhysicsObjectById(m_objectId);
00160                         if (!obj) {
00161                                 DPRINTF("Client asked for object not there");
00162 
00163                                 // object is destroyed!
00164                         //      output->addInt32Packlet('d', &m_objectId, 1);
00165                         } else {
00166                                 DPRINTF("Telling client about object %ld",
00167                                     (long) m_objectId);
00168                                 smart_ptr<Instance> instance =
00169                                     getInstanceFromPhysicsObject(obj);
00170                                 if (!instance)
00171                                         return;
00172 
00173                                 const char * typeId = instance->getTypeId();
00174                                 int tiLength = strlen(typeId);
00175 
00176                                 point3d_t pos = obj->getPosition();
00177                                 //pos.dump("  position");
00178 
00179                                 // look up map ID
00180                                 MapDynamics * dyn =
00181                                     getMapDynamicsFromPhysicsObject(obj);
00182                                 if (!dyn) {
00183                                         return;
00184                                 }
00185 
00186                                 Map * map = dyn->getMap();
00187                                 ASSERT(map, "null");
00188 
00189                                 const char * mapId = map->getId();
00190                                 int miLength = strlen(mapId);
00191 
00192                                 // for now, we just provide type ID and mapID
00193                                 output->openPacklet('q');       // query response
00194                                 output->addInt32Packlet('i',
00195                                     (int32_t *) &m_objectId, 1);
00196                                 output->addStringPacklet(
00197                                     't', typeId, tiLength);
00198                                 output->addStringPacklet(
00199                                     'm', mapId, miLength);
00200                                 output->closePacklet('q');
00201                         }
00202                 }
00203 
00204         int getMaxBytes(void) const throw() {
00205                         return 255 + 255 + 4 + 4 + 4 * 2;
00206                 }
00207 
00208         const char * getId(void) const throw() { return m_id; }
00209 
00210 private:
00211         char            m_id[32];
00212         dword_t         m_objectId;
00213 };
00214 
00215 
00216 
00217 class AuthContext : public netrq::Request {
00218 public:
00219         AuthContext(IN dword_t token,
00220                     IN const char * key) {
00221                         ASSERT(token, "null");
00222                         ASSERT(key, "null");
00223                         sprintf(m_id, "auth%ld", (long) token);
00224                         m_token = token;
00225                         m_encryptedDesKey = key;
00226                 }
00227         ~AuthContext(void) throw() { }
00228 
00229         void write(IN xdrbuf::Output * output) {
00230                         ASSERT(output, "null");
00231 
00232                         output->openPacklet('c');
00233                         output->addInt32Packlet('t',
00234                             (int32_t *) &m_token, 1);
00235                         output->openPacklet('k');
00236 
00237                         const char * key = m_encryptedDesKey.c_str();
00238                         int len = m_encryptedDesKey.length();
00239                         while (len) {
00240                                 int send = len;
00241                                 if (send > 255) {
00242                                         send = 255;
00243                                 }
00244                                 output->addStringPacklet('a', key, send);
00245                                 key += send;
00246                                 len -= send;
00247                         }
00248                         output->closePacklet('k');
00249                         output->closePacklet('c');
00250                 }
00251 
00252         int getMaxBytes(void) const throw() {
00253                         return 4 + 512 + 5 * 2;
00254                 }
00255 
00256         const char * getId(void) const throw() { return m_id; }
00257 
00258 private:
00259         char            m_id[32];
00260         dword_t         m_token;
00261         std::string     m_encryptedDesKey;
00262 };
00263 
00264 
00265 
00266 class ObjectExistsResponse : public netrq::Request {
00267 public:
00268         ~ObjectExistsResponse(void) throw() { }
00269 
00270         // netrq::Request class interface methods ------------------------------
00271         const char * getId(void) const throw() { return m_requestId; }
00272         int getMaxBytes(void) const throw() { return 6; }
00273         void write(IO xdrbuf::Output * output) {
00274                         ASSERT(output, "null");
00275 
00276                         // well, does our object exist?
00277                         smart_ptr<PhysicsObject> obj =
00278                             getPhysicsObjectById(m_objectId);
00279                         if (obj && obj->getId()) {
00280                                 // yes, object exists and is healthy!
00281                                 return;         // nothing to do
00282                         }
00283 
00284                         // object is destroyed!
00285                         output->addInt32Packlet('d',
00286                             (int32_t *) &m_objectId, 1);
00287                 }
00288 
00289         // static helper methods -----------------------------------------------
00290         static smart_ptr<netrq::Request> create(IN const char * requestId,
00291                                 IN long objectId) {
00292                         ASSERT(requestId, "null");
00293                         ASSERT(objectId, "null");
00294 
00295                         smart_ptr<ObjectExistsResponse> local =
00296                             new ObjectExistsResponse;
00297                         ASSERT(local, "out of memory");
00298 
00299                         strcpy(local->m_requestId, requestId);
00300                         local->m_objectId = objectId;
00301 
00302                         return local;
00303                 }
00304 private:
00305         // private helper methods ----------------------------------------------
00306         ObjectExistsResponse(void) throw() { }
00307 
00308         // private member data -------------------------------------------------
00309         char            m_requestId[64];
00310         dword_t         m_objectId;
00311 };
00312 
00313 
00314 
00315 ////////////////////////////////////////////////////////////////////////////////
00316 //
00317 //      ServerImpl
00318 //
00319 ////////////////////////////////////////////////////////////////////////////////
00320 
00321 class ServerImpl : public StateUpdates,
00322                         public Server,
00323                         public converse::ConversationRouter {
00324 public:
00325         ServerImpl(void) throw();
00326         ~ServerImpl(void) throw() { }
00327 
00328         // public member functions ---------------------------------------------
00329         void initialize(IN smart_ptr<Datahash>& params,
00330                                 IN smart_ptr<ServerGameLogic>& gameLogic);
00331 
00332         // aesop::Server class interface methods ------------------------------
00333         int exec(void);
00334         void requestStopTS(IN int retval);
00335 
00336         // aesop::StateUpdates class interface methods ------------------------
00337         //      REMEMBER: these must be threadsafe!
00338         void enqueueMessageTS(IN conn_id_t connId,
00339                                 IN smart_ptr<MessageBuffer>& message);
00340         void newTcpConnectionTS(IN conn_id_t connId,
00341                                 IN long token);
00342         void notifyDialogReplyTS(IN conn_id_t conn_id,
00343                                 IN const char * conversationGuid,
00344                                 IN int dialogId,
00345                                 IN int playerId,
00346                                 IN const Datahash * reply);
00347         bool setPlayerUserAccountTS(IN conn_id_t conn_id,
00348                                 IN int playerId,
00349                                 IN const char * username,
00350                                 IN const char * usertag,
00351                                 OUT std::string& diagnostic);
00352         void newGameTS(IN conn_id_t connId,
00353                                 IN int playerId);
00354         void requestShutdownTS(void);
00355 
00356         // converse::ConversationRouter class interface methods ----------------
00357         void routeConversationTS(IN const char * guid,
00358                                 IN int dialogId,
00359                                 IN int playerId,
00360                                 IN conn_id_t connId,
00361                                 IN const char * dialogData);
00362 
00363 private:
00364         // private typedefs ----------------------------------------------------
00365         struct request_t {
00366                 // constructor, manipulators
00367                 request_t(void) throw() { this->clear(); }
00368                 void clear(void) throw() {
00369                                 conn_id = 0;
00370                                 message = NULL;
00371                         }
00372                 bool is_empty(void) const throw() {
00373                                 return (!conn_id && !message);
00374                         }
00375 
00376                 // data fields
00377                 conn_id_t                       conn_id;
00378                 smart_ptr<MessageBuffer>        message;
00379         };
00380         typedef threadsafe_queue<request_t> msg_queue_t;
00381 
00382         // private helper methods ----------------------------------------------
00383         void checkMessages(IN long microseconds);
00384         void handleMessage(IN const envelope_t& env,
00385                                 IN const MessageBuffer * msg);
00386         void handleUdp(IN const envelope_t& env,
00387                                 IN const MessageBuffer * msg);
00388         void parseHostConnectUdp(IN const netlib::envelope_t& env,
00389                                 IO xdrbuf::Input * input);
00390         void parseHostAuthUdp(IN host_rec_t& hr,
00391                                 IN const xdrbuf::PackletHeader& ph,
00392                                 IO xdrbuf::Input * input);
00393         void parsePlayerUdp(IN host_rec_t& hr,
00394                                 IO xdrbuf::Input * input);
00395         void parseQueryUdp(IN host_rec_t& hr,
00396                                 IN const xdrbuf::PackletHeader& ph,
00397                                 IO xdrbuf::Input * input);
00398         void parseGameUdp(IN host_rec_t& hr,
00399                                 IN const xdrbuf::PackletHeader& ph,
00400                                 IO xdrbuf::Input * input);
00401         void parseExistsUdp(IN host_rec_t& hr,
00402                                 IO xdrbuf::Input * input);
00403         void sendUdp(void);
00404         const char * getStoryGuidTS(void);
00405         void newPlayer(IN host_rec_t& hr, IN int playerId);
00406         bool isAdmin(IN conn_id_t udpConnID, IN int playerId);
00407         void checkPlayers(void);
00408         void sendBroadcast(void);
00409 
00410         // private member data -------------------------------------------------
00411         smart_ptr<story::Story>         m_story;
00412         smart_ptr<MessageRouter>        m_router;
00413         smart_ptr<threadpool::Pool>     m_threadpool;
00414         smart_ptr<Datahash>             m_params;
00415         smart_ptr<converse::ConversationManager> m_conversationMgr;
00416         smart_ptr<UserManager>          m_userMgr;
00417         smart_ptr<PlayerManager>        m_playerMgr;
00418         smart_ptr<HostManager>          m_hostMgr;
00419         smart_ptr<MapManager>           m_mapMgr;
00420         smart_ptr<ServerGameLogic>      m_gameLogic;
00421         smart_ptr<xdrbuf::Input>        m_xdrInput;     // input parsing
00422         smart_ptr<xdrbuf::Output>       m_xdrBcast;     // broadcast output
00423         msg_queue_t                     m_outboundMessages;
00424         float                           m_requestedHz;  // main loop cycles
00425         float                           m_udpHz;        // UDP frequency
00426         bool                            m_shutdown;
00427         int                             m_retval;
00428         conn_id_t                       m_tcpConn;      // TCP listening socket
00429         int                             m_udpPort;      // clients send UDP here
00430         int                             m_tcpPort;      // clients send TCP here
00431         conn_id_t                       m_udpConn;      // listening connection
00432         conn_id_t                       m_udpBcast;     // broadcast UDP point
00433         threadsafe_refcount             m_workerTrc;    // refcount of workers
00434         name_map_t                      m_threadNames;
00435         int                             m_threadCounter;
00436         eServerMode                     m_smode;
00437         dword_t                         m_udpClock;     // clock for UDP messages
00438         int                             m_bcastDelay;   // seconds between bcasts
00439         std::string                     m_publicName;   // server display name
00440 };
00441 
00442 
00443 
00444 ServerImpl::ServerImpl(void)
00445 throw()
00446 {
00447         m_shutdown = false;
00448         m_retval = 0;
00449         m_threadCounter = 0;
00450         m_requestedHz = 0;
00451         m_udpHz = 0;
00452         m_udpPort = 0;
00453         m_tcpPort = 0;
00454         m_udpConn = 0;
00455         m_tcpConn = 0;
00456         m_udpBcast = 0;
00457         m_smode = eServerMode_Invalid;
00458         m_udpClock = 0;
00459         m_bcastDelay = 4;       // make this config driven?
00460 }
00461 
00462 
00463 
00464 void
00465 ServerImpl::initialize
00466 (
00467 IN smart_ptr<Datahash>& params,
00468 IN smart_ptr<ServerGameLogic>& gameLogic
00469 )
00470 {
00471         ASSERT(params, "null");
00472         ASSERT(gameLogic, "null");
00473 
00474         DPRINTF("In Server::initialize()...");
00475 
00476         // save parameters
00477         m_params = params;
00478 
00479         // game logic object
00480         m_gameLogic = gameLogic;
00481 
00482         // create xdr input parser for UDP packets
00483         ASSERT(!m_xdrInput, "Alread have an input parser?");
00484         m_xdrInput = xdrbuf::Input::create();
00485         ASSERT(m_xdrInput, "Failed to create xdr input parser");
00486 
00487         // create xdr output buffer for UDP broadcasts
00488         ASSERT(!m_xdrBcast, "Already have an output buffer");
00489         m_xdrBcast = xdrbuf::Output::create(s_bcastBuffer);
00490         ASSERT(m_xdrBcast, "null");
00491 
00492         // requested server loop speed?
00493         m_requestedHz = getFloat(params, "requestedHz");
00494         if (m_requestedHz < 1.0 || m_requestedHz > 200.0) {
00495                 WAVE_EX(wex);
00496                 wex << "Invalid requested Hz: " << m_requestedHz;
00497                 wex << ". Should be be between 1 and 200 Hz";
00498         }
00499 
00500         // cycles per UDP send?
00501         m_udpHz = getFloat(params, "udpHz");
00502         if (m_udpHz < 1 || m_udpHz > 20) {
00503                 WAVE_EX(wex);
00504                 wex << "Invalid udp Hz: " << m_udpHz;
00505         }
00506 
00507         int udpSize = getInt(params, "udpSize");
00508         if (udpSize < 64 || udpSize > 1024) {
00509                 WAVE_EX(wex);
00510                 wex << "Invalid udp packet size: " << udpSize;
00511         }
00512 
00513         // default locale (see TODO file regarding locales!)
00514         const char * locale = i18n::getHostLocale();
00515         ASSERT_THROW(locale, "failed to determine host locale?");
00516         DPRINTF("Using locale: '%s'", locale);
00517 
00518         // load story
00519         const char * story_dir = getString(params, "storyDir");
00520         m_story = story::Story::create(locale, story_dir);
00521         ASSERT(m_story, "failed to load story");
00522 
00523         // initialize thread pool
00524         smart_ptr<Datahash> pool_params = getSubhash(params, "threadpool");
00525         ASSERT(pool_params, "should have subhash for threadpool init");
00526         m_threadpool = threadpool::Pool::create(pool_params);
00527         ASSERT(m_threadpool, "failed to create thread pool for server");
00528 
00529         // figure out where to broadcast
00530         // Our "best guess" is the last non-loopback interface found
00531         netlib::map_ip_addr_t ips;
00532         netlib::getLocalInterfaces(ips);
00533         netlib::ip_addr_t bestGuess;
00534         DPRINTF("Checking server interfaces:");
00535         for (netlib::map_ip_addr_t::const_iterator i = ips.begin();
00536              i != ips.end(); ++i) {
00537                 const char * name = i->first.c_str();
00538                 const netlib::ip_addr_t& ip = i->second;
00539 
00540                 ip.dump(name);
00541                 if (!ip.isLoopback()) {
00542                         bestGuess = ip;
00543                 }
00544         }
00545         if (!bestGuess.isValid()) {
00546                 DPRINTF("No non-loopback interfaces available!");
00547                 DPRINTF("Server is using loopback interface only");
00548                 bestGuess.setLoopback();
00549         }
00550         bestGuess.setBroadcast();
00551 
00552         // broadcast data
00553         netlib::address_t bcastAddress;
00554         int clientPort = eDefaultPort_clientUdp;
00555         const char * clientPortStr =
00556             getString(params, "clientPort", eDatahash_Optional);
00557         if (!clientPortStr) {
00558                 DPRINTF("No client UDP port specified, using default: %d",
00559                     clientPort);
00560         } else {
00561                 DPRINTF("Overriding default client port of %d", clientPort);
00562                 clientPort = atoi(clientPortStr);
00563                 DPRINTF("  with new port value: %d", clientPort);
00564                 ASSERT2(clientPort > 0, "Bad client udp port: " << clientPort);
00565         }
00566         const char * bcastIP =
00567             getString(params, "broadcastAddress", eDatahash_Optional);
00568         if (!bcastIP) {
00569                 DPRINTF("No broadcastAddress specified, using auto-detected address");
00570                 bcastAddress.ip = bestGuess;
00571                 bcastAddress.port = clientPort;
00572         } else {
00573                 DPRINTF("Overriding autodetected broadcast address");
00574                 bestGuess.dump("autodetected broadcast address");
00575                 bcastAddress.set(bcastIP, clientPort);
00576         }
00577         bcastAddress.dump("UDP broadcast address");
00578         m_publicName = getString(params, "publicName");
00579 
00580         // udp ports
00581         m_udpPort = getInt(params, "udpPort");
00582         ASSERT(m_udpPort > 0, "Bad udp port: %d", m_udpPort);
00583         ASSERT(!m_udpConn, "already have a UDP listener?");
00584         netlib::address_t udpAddress;
00585         udpAddress.setlocal(m_udpPort);
00586         udpAddress.dump("UDP address");
00587 
00588         m_udpConn = netlib::createUdpLocal(udpAddress);
00589         ASSERT(m_udpConn, "null");
00590         ASSERT(!m_udpBcast, "already have a UDP broadcaster?");
00591         m_udpBcast = netlib::createUdpBroadcast(bcastAddress);
00592         ASSERT2(m_udpBcast, "null");
00593 
00594         // message router
00595         m_router = MessageRouter::create();
00596         ASSERT(m_router, "Failed to create message router");
00597 
00598         // conversation manager
00599         m_conversationMgr = converse::ConversationManager::create(this);
00600         ASSERT(m_conversationMgr, "failed to create conversation manager");
00601 
00602         // user manager
00603         m_userMgr = UserManager::create(params);
00604         ASSERT(m_userMgr, "failed to create user manager");
00605 
00606         // player manager
00607         m_playerMgr = PlayerManager::create(m_userMgr);
00608         ASSERT(m_playerMgr, "failed to create player manager");
00609 
00610         // host manager
00611         m_hostMgr = HostManager::create(udpSize);
00612         ASSERT(m_hostMgr, "failed to create host manager");
00613 
00614         // map manager
00615         m_mapMgr = MapManager::create(params, m_story);
00616         ASSERT(m_mapMgr, "failed to create map manager");
00617 
00618         // hook up game logic object
00619         m_gameLogic->setPlayerManager(m_playerMgr);
00620         m_gameLogic->setMapManager(m_mapMgr);
00621 
00622         // set up tcp listener
00623         m_tcpPort = getInt(params, "tcpPort");
00624         ASSERT(m_tcpPort > 0, "Bad listening port: %d", m_tcpPort);
00625         int backlog = getInt(params, "tcpBacklog");
00626         ASSERT(backlog > 0, "Bad max backlog: %d", backlog);
00627         ASSERT(!m_tcpConn, "already have a tcp listener?");
00628         netlib::address_t tcpAddress;
00629         tcpAddress.setlocal(m_tcpPort);
00630         tcpAddress.dump("TCP Address");
00631         m_tcpConn = netlib::createTcpListener(tcpAddress, backlog);
00632         ASSERT(m_tcpConn, "failed to create tcp listener socket");
00633 
00634         // initialize type instance library
00635         initializeTypeInstanceLibrary(m_story);
00636 
00637         // successful initialization
00638         m_smode = eServerMode_Idle;
00639 }
00640 
00641 
00642 /// \ingroup aesop_srv
00643 /// \defgroup aesop_srv_loop Core Processing Loop
00644 ///
00645 /// The Server::exec() call starts the server loop, which runs until the
00646 /// server is shut down.
00647 ///
00648 /// Anything happening here will have immediate (and painful) impact on
00649 ///   all connected clients (lag times, etc.).  So make sure only core
00650 ///   tasks are here, and that they happen quickly.
00651 ///
00652 /// Here is what should happen in the loop:
00653 ///  -# handle any UDP updates from clients (this is quick game state
00654 ///        updates).
00655 ///  -# update all loaded maps (next time tic)
00656 ///  -# send state update back to all interested clients (UDP)
00657 ///
00658 /// Some examples of things that should NOT happen in this loop:
00659 ///   - opening/reading/writing to files or sockets
00660 ///       (so for instance, saves + loads must happen elsewhere)
00661 ///   - calculations that crunch all state (except tics, above)
00662 ///   - TCP request handling
00663 ///
00664 
00665 // TODO: there is one weirdness with the networking library.  Based
00666 // on the core messaging loop, TCP reads and sends still happen on this
00667 // thread.  At the moment, this is considered desirable because some
00668 // important connection state is shared for all threads, so it may
00669 // make sense to keep the low-level state and routing on the main
00670 // thread rather than having a lot of locking.  If this turns into a
00671 // performance issue then even the low level TCP read/writes should be
00672 // moved off this thread.  This isn't expected to be an issue because
00673 // all TCP reads/writes are non-blocking.
00674 
00675 
00676 int
00677 ServerImpl::exec
00678 (
00679 void
00680 )
00681 {
00682         perf::Timer timer("aesop-srv::main-loop");
00683 
00684         // given requested Hz (loops/second), determine microseconds per loop
00685         ASSERT(m_requestedHz > 0 && m_requestedHz < 1000,
00686             "Bad server frequency?  %f hz", m_requestedHz);
00687         int loop_us = (int) (1.0e6 / m_requestedHz);
00688         int max_usec = loop_us + 1000;  // maximum allowed value
00689         loop_us -= 1000;        // take off another ms for good measure
00690         DPRINTF("Server loop requested to run at %.1f hz", m_requestedHz);
00691         DPRINTF("  That means %d microseconds per loop", loop_us);
00692         ASSERT(loop_us > 0 && loop_us < 1000 * 1000,
00693             "Bad loop microseconds? %d", loop_us);
00694 
00695         // how many loop cycles per UDP send?
00696         DPRINTF("UDP packets requested at %.1f hz", m_udpHz);
00697         int udpSend = (int) ((m_requestedHz / m_udpHz) + 0.5);
00698         DPRINTF("Sending udp packets every %d cycles", udpSend);
00699 
00700         // time delta per loop
00701         perf::time_t dt(0, loop_us);
00702 
00703         // time delta between broadcasts
00704         ASSERT2(m_bcastDelay > 0, "Bad delay: " << m_bcastDelay);
00705         perf::time_t bcastDt(m_bcastDelay, 0);
00706 
00707         // target time for next loop (now + dt)
00708         perf::time_t target = perf::getNow();
00709         target.increment(dt);
00710 
00711         // target time for next broadcast
00712         perf::time_t targetBcast = perf::getNow();
00713         targetBcast.increment(bcastDt);
00714 
00715         perf::time_t benchmark = perf::getNow();
00716         static const int s_benchmarkCount = (int) (3 * m_requestedHz);
00717 
00718         // main loop
00719         int udpCount = 0;
00720         perf::time_t lastLoopTime = perf::getNow();
00721         for (long loop_counter = 0; !m_shutdown; ++loop_counter, udpCount++) {
00722 
00723                 // what time is now?
00724                 perf::time_t now = perf::getNow();
00725                 perf::time_t delta = now;
00726                 delta.decrement(lastLoopTime);
00727                 lastLoopTime = now;
00728 
00729                 // need to broadcast?
00730                 if (now.isGreaterThan(targetBcast)) {
00731                         this->sendBroadcast();
00732                         targetBcast = now;
00733                         targetBcast.increment(bcastDt);
00734                 }
00735 
00736                 // recalibrate time delta.  This loop will try to stay close
00737                 //   to the desired Hz rate.
00738                 if (s_benchmarkCount == loop_counter) {
00739                         loop_counter = 0;
00740                         perf::time_t delta = now;
00741                         delta.decrement(benchmark);
00742                         double delta_t = delta.getSeconds();
00743                         if (delta_t > 0.01) {
00744                                 double hz = s_benchmarkCount / delta_t;
00745                                 double dr = (hz / m_requestedHz) - 1.0;
00746 
00747                         //      DPRINTF("%6.2lf Hz server loop", hz);
00748 
00749                                 double s = dt.getSeconds();
00750                                 s *= 1.0 + 0.15 * dr;           // s is now requested dt
00751                                 int usec = (int) (1000 * 1000 * s);
00752                                 if (usec >= 0 && usec < max_usec) {
00753                                         perf::time_t new_dt(0, usec);
00754                                         dt = new_dt;
00755                                 }
00756                         }
00757                         benchmark = now;
00758                 }
00759 
00760                 // check on player state
00761                 this->checkPlayers();
00762 
00763                 // update all game state
00764                 float delta_seconds = (float) delta.getSeconds();
00765                 if (delta_seconds < 1.0e-6) {
00766                         // force to be a small positive number
00767                         delta_seconds = 1.0e-6;
00768                 }
00769                 m_mapMgr->tickMaps(delta_seconds);
00770 
00771                 // let the game logic run
00772                 {
00773                         perf::Timer timer("game-logic-tick");
00774                         m_gameLogic->tick(delta_seconds);
00775                 }
00776 
00777                 // send udp messages?
00778                 if (udpCount >= udpSend) {
00779                         udpCount = 0;
00780                         try {
00781                                 this->sendUdp();
00782                         } catch (std::exception& e) {
00783                                 DPRINTF("Failed to send UDP packets: %s",
00784                                         e.what());
00785                         }
00786                 }
00787 
00788                 // message pump
00789                 while (true) {
00790                         perf::time_t now = perf::getNow();
00791                         if (now.isGreaterThan(target)) {
00792                                 target = now;
00793                                 target.increment(dt);
00794                                 break;
00795                         }
00796 
00797                         // send an enqueued message
00798                         request_t req;
00799                         if (m_outboundMessages.pop_front(req)) {
00800                                 ASSERT(!req.is_empty(),
00801                                     "empty message in queue");
00802                                 netlib::enqueueMessage(req.conn_id,
00803                                     req.message);
00804                         }
00805 
00806                         // read messages
00807                         perf::time_t delta;
00808                         delta = target;
00809                         delta.decrement(now);
00810                         this->checkMessages(delta.getMicroseconds());
00811                 }
00812         }
00813         DPRINTF("Shutting down server!");
00814 
00815         // we need to wait until all worker threads have exited
00816         DPRINTF("Waiting for worker threads to exit...");
00817         for (;;) {
00818                 long lCount = m_workerTrc.getRefCount();
00819                 if (lCount < 2) {
00820                         break;          // all done!
00821                 }
00822                 DPRINTF("  Waiting for %ld workers to exit...", lCount - 1);
00823 
00824                 // debugging: who is working?
00825                 name_map_t::iterator_t i;
00826                 m_threadNames.getIterator(i);
00827                 int nThread;
00828                 std::string name;
00829                 while (m_threadNames.getNextElement(i, nThread, name)) {
00830                         DPRINTF("    Thread %3d = %s", nThread, name.c_str());
00831                 }
00832 
00833                 // TODO: could force exit after 30 seconds etc...
00834                 sleep(1);
00835         }
00836 
00837         // any closing stuff?
00838         netlib::dumpStats();
00839 
00840         // all done
00841         return m_retval;
00842 }
00843 
00844 
00845 
00846 void
00847 ServerImpl::requestStopTS
00848 (
00849 IN int retval
00850 )
00851 {
00852         // caller is requesting that we stop
00853         // this method needs to be threadsafe without any locking!  This is
00854         // typically used by signal handlers etc.
00855 
00856         // these fields are safe to be set by an arbitrary thread, since we
00857         // don't care about race conditions on reading
00858         m_retval = retval;
00859         m_shutdown = true;
00860 }
00861 
00862 
00863 
00864 ////////////////////////////////////////////////////////////////////////////////
00865 //
00866 //      ServerImpl -- aesop::StateUpdates class interface methods
00867 //
00868 //      All methods should be threadsafe, hence the TS suffix.
00869 //
00870 ////////////////////////////////////////////////////////////////////////////////
00871 
00872 void
00873 ServerImpl::enqueueMessageTS
00874 (
00875 IN conn_id_t connId,
00876 IN smart_ptr<MessageBuffer>& message
00877 )
00878 {
00879         ASSERT(connId, "null");
00880         ASSERT(message, "null");
00881 
00882         // threadsafe!  add to our threadsafe list
00883         request_t req;
00884         req.conn_id = connId;
00885         req.message = message;
00886         m_outboundMessages.push_back(req);
00887 }
00888 
00889 
00890 
00891 void
00892 ServerImpl::newTcpConnectionTS
00893 (
00894 IN conn_id_t connId,
00895 IN long token
00896 )
00897 {
00898         ASSERT(connId, "null");
00899         ASSERT(token, "null");
00900 
00901         // must be threadsafe!
00902 
00903         // see if we can find the host based on the token
00904         host_rec_t hr;
00905         if (!m_hostMgr->getHostByUdpConnectionID(token, hr)) {
00906                 DPRINTF("Unrecognized host in new tcp connection");
00907                 return;
00908         }
00909 
00910         // have a host--does it match the IP?
00911 //      netlib::connection_info_t ci1, ci2;
00912 
00913 //      netlib::getConnectionInfo(token, ci1);
00914 //      netlib::getConnectionInfo(connId, ci2);
00915 
00916 //      ci1.dump("Local host information");
00917 //      ci2.dump("Remote host information");
00918 
00919         // for now, just compare IP addresses
00920 //      if (ci1.address.ip != ci2.address.ip) {
00921 //              DPRINTF("TCP connect info does not match UDP?");
00922 //              return;         // disallow TCP connection
00923 //      }
00924 
00925         // okay, valid!
00926         hr.tcpConn = connId;
00927         m_hostMgr->updateHost(hr);
00928 }
00929 
00930 
00931 
00932 void
00933 ServerImpl::newPlayer
00934 (
00935 IN host_rec_t& hr,
00936 IN int playerId
00937 )
00938 {
00939         ASSERT(hr.isValid(), "invalid");
00940         ASSERT(playerId > 0, "Bad player ID: %d", playerId);
00941 
00942         // NOTE: this entire logic chain needs to be idempotent!
00943 
00944         // this routine must be threadsafe!
00945         // this happens right now because hostMgr and playerMgr are threadsafe
00946 
00947         // only allowed if host is in good state
00948         if (eHostMode_Authorized != hr.mode) {
00949                 DPRINTF("Host is not authorized to add players");
00950                 return;
00951         }
00952 
00953         // also only allowed if host has TCP connection
00954         if (!hr.tcpConn) {
00955                 DPRINTF("Cannot add players: no tcp connection");
00956                 return;
00957         }
00958 
00959         // add player!
00960         player_rec_t pr;
00961         pr.udpConnId = hr.udpConn;
00962         pr.playerId = playerId;
00963         m_playerMgr->updatePlayer(pr);
00964 
00965         // fire up a new player conversation
00966         std::ostringstream oss;
00967         oss << "new-player:" << pr.udpConnId << ":" << playerId;
00968 
00969         // create conversation host
00970         new_player_conv_data_t data;
00971         data.updates = this;
00972         data.userMgr = m_userMgr;
00973         data.connId = hr.tcpConn;
00974         data.playerId = playerId;
00975         ASSERT(data.isValid(), "should be valid now");
00976 
00977         smart_ptr<converse::ConversationHost> host =
00978             createNewPlayerConversationHost(hr.desKey, data);
00979         ASSERT(host, "null");
00980 
00981         // start up conversation
00982         ASSERT(m_conversationMgr, "null");
00983         m_conversationMgr->updateConversationTS(oss.str().c_str(), hr.tcpConn,
00984             playerId, host);
00985 }
00986 
00987 
00988 
00989 void
00990 ServerImpl::notifyDialogReplyTS
00991 (
00992 IN conn_id_t conn_id,
00993 IN const char * conversationGuid,
00994 IN int dialogId,
00995 IN int playerId,
00996 IN const Datahash * reply
00997 )
00998 {
00999         ASSERT(conn_id, "Null");
01000         ASSERT(conversationGuid, "null");
01001         ASSERT(dialogId > 0, "Bad dialog id: %d", dialogId);
01002         ASSERT(playerId > 0, "Bad player id: %d", playerId);
01003         ASSERT(reply, "null");
01004 
01005         // threadsafe!
01006         // use threadsafe conversation manager
01007         ASSERT(m_conversationMgr, "null");
01008         m_conversationMgr->handleDialogReplyTS(conversationGuid,
01009                                         dialogId,
01010                                         conn_id,
01011                                         playerId,
01012                                         reply);
01013 }
01014 
01015 
01016 
01017 bool
01018 ServerImpl::setPlayerUserAccountTS
01019 (
01020 IN conn_id_t conn_id,
01021 IN int playerId,
01022 IN const char * username,
01023 IN const char * usertag,
01024 OUT std::string& diagnostic
01025 )
01026 {
01027         ASSERT(conn_id, "null");
01028         ASSERT(playerId > 0, "bad player id: %d", playerId);
01029         ASSERT(username, "null");
01030         ASSERT(usertag, "null");
01031         ASSERT(m_userMgr, "null");
01032         diagnostic.clear();
01033 
01034         // check about admin first (MUST happen before lock, disk hit)
01035         bool isAdmin = m_userMgr->isAdminTS(username);
01036         if (isAdmin) {
01037                 DPRINTF("User '%s' is an admin!", username);
01038         }
01039 
01040         // get host
01041         host_rec_t hr;
01042         if (!m_hostMgr->getHostByTcpConnectionID(conn_id, hr)) {
01043                 DPRINTF("Cannot set player user account: bad host");
01044                 return false;
01045         }
01046 
01047         // get player record
01048         player_rec_t pr;
01049         if (!m_playerMgr->getPlayerByHostAndPlayerId(hr.udpConn, playerId, pr)) {
01050                 diagnostic = "Internal Error: No record for this player!";
01051                 return false;
01052         }
01053 
01054         // update local copy
01055         pr.username.set(username);
01056         pr.playerTag.set(usertag);
01057         if (isAdmin) {
01058                 pr.flags |= ePrivilege_Admin;
01059         }
01060 
01061         // update player record
01062         m_playerMgr->updatePlayer(pr);
01063 
01064         // allowed!
01065         return true;
01066 }
01067 
01068 
01069 
01070 void
01071 ServerImpl::newGameTS
01072 (
01073 IN conn_id_t connId,
01074 IN int playerId
01075 )
01076 {
01077         ASSERT(connId, "null");
01078         ASSERT(playerId > 0, "Bad player id: %d", playerId);
01079 
01080         // threadsafe!
01081 
01082         // get host
01083         host_rec_t hr;
01084         if (!m_hostMgr->getHostByTcpConnectionID(connId, hr)) {
01085                 DPRINTF("new game request ignored: bad host?");
01086                 return;
01087         }
01088         conn_id_t hostId = hr.udpConn;
01089 
01090         // first verify this is a valid admin
01091         // no lock needed
01092         if (!this->isAdmin(hr.udpConn, playerId)) {
01093                 DPRINTF("Player %d is not admin, cannot start game",
01094                     playerId);
01095                 return;
01096         }
01097 
01098         // okay!
01099         DPRINTF("playerId=%d from connection 0x%04lx has requested a new game",
01100             playerId, (long) hostId);
01101 
01102         // nuke all existing maps!
01103         m_mapMgr->clear();
01104 
01105         // request that starting map be loaded
01106         std::string id = m_story->getStartingMapId();
01107         DPRINTF("  Starting map id='%s'", id.c_str());
01108         m_mapMgr->requestLoad(id.c_str());
01109 }
01110 
01111 
01112 
01113 void
01114 ServerImpl::requestShutdownTS
01115 (
01116 void
01117 )
01118 {
01119         // This needs to be threadsafe!
01120         // However, we can cheat here.  the shutdown flag is special:
01121         //   it is always false, until it goes true, and stays true
01122         // So we can just set it to true here, and not worry if another
01123         //   thread reads it as false at the same time.  Other threads
01124         //   will eventually figure out we are shutting down...
01125 
01126         DPRINTF("Requested to shut down!  Setting shutdown flag...");
01127         m_shutdown = true;
01128 }
01129 
01130 
01131 
01132 ////////////////////////////////////////////////////////////////////////////////
01133 //
01134 //      ServerImpl -- converse::ConversationRouter class interface methods
01135 //
01136 ////////////////////////////////////////////////////////////////////////////////
01137 
01138 void
01139 ServerImpl::routeConversationTS
01140 (
01141 IN const char * guid,
01142 IN int dialogId,
01143 IN int playerId,
01144 IN conn_id_t connId,
01145 IN const char * dialogData
01146 )
01147 {
01148         ASSERT(guid, "null");
01149         ASSERT(dialogId >= 0, "Bad dialog id: %d", dialogId);
01150         ASSERT(playerId > 0, "Bad player id: %d", playerId);
01151         ASSERT(connId, "null");
01152         ASSERT(dialogData, "null");
01153 
01154         smart_ptr<MessageBuffer> msg;
01155         // threadsafe!
01156         // end of conversation?
01157         if (!dialogId) {
01158                 DPRINTF("Ending conversation: %s", guid);
01159                 msg = createTerminateConversationMessage(playerId, guid);
01160         } else {
01161                 // construct dialog message
01162                 msg = createConversationDialogMessage(playerId, guid,
01163                     dialogId, dialogData);
01164         }
01165         ASSERT(msg, "null");
01166 
01167         // threadsafe -- enqueue
01168         this->enqueueMessageTS(connId, msg);
01169 }
01170 
01171 
01172 
01173 ////////////////////////////////////////////////////////////////////////////////
01174 //
01175 //      ServerImpl -- private helper methods
01176 //
01177 ////////////////////////////////////////////////////////////////////////////////
01178 
01179 void
01180 ServerImpl::checkMessages
01181 (
01182 IN long microseconds
01183 )
01184 {
01185         perf::Timer timer("aesop-srv::checkMessages");
01186         ASSERT(microseconds >= 0, "Bad microseconds: %ld", microseconds);
01187 
01188         envelope_t env;
01189         smart_ptr<MessageBuffer> msg;
01190         if (!netlib::getNextMessage(microseconds, env, msg))
01191                 return;         // nothing to do!
01192 
01193         try {
01194                 // what kind of message?
01195                 if (netlib::eType_UDPRemote == env.type) {
01196                         // udp packet from client
01197                         this->handleUdp(env, msg);
01198                 } else if (netlib::eType_TCP == env.type) {
01199                         // tcp packet from client
01200                         this->handleMessage(env, msg);
01201                 } else {
01202                         DPRINTF("Unknown message type!  %d", env.type);
01203                 }
01204 
01205         } catch (std::exception& e) {
01206                 dumpMessage(std::cerr, "Exception handling this message",
01207                     env, msg);
01208                 DPRINTF("Exception: %s", e.what());
01209         }
01210 }
01211 
01212 
01213 
01214 void
01215 ServerImpl::handleMessage
01216 (
01217 IN const envelope_t& env,
01218 IN const MessageBuffer * msg
01219 )
01220 {
01221         perf::Timer timer("aesop-srv::handleMessage");
01222         ASSERT(!env.is_empty(), "empty envelope");
01223         ASSERT(msg, "null");
01224 
01225         // TCP?  get payload
01226         tcp_payload_t payload;
01227         getTcpPayload(msg->getData(), payload);
01228         ASSERT(!payload.is_empty(), "empty");
01229 
01230         DPRINTF("Got message: %s", payload.command.c_str());
01231         
01232         // get handler for this
01233         msg_handler_fn handler = m_router->getHandler(payload);
01234         if (!handler) {
01235                 return;         // TODO: send back an error code?
01236         }
01237 
01238         // construct state object
01239         smart_ptr<handler_intercept_t> hi =
01240             new handler_intercept_t(m_workerTrc);
01241         ASSERT(hi, "out of memory");
01242         hi->fn = handler;
01243         hi->state.src_env = env;
01244         hi->state.payload = payload;
01245         hi->state.story = m_story;
01246         hi->state.updates = this;
01247         hi->map = &m_threadNames;
01248 
01249         // construct name
01250         ++m_threadCounter;
01251         m_threadNames.insert(m_threadCounter, payload.command.c_str());
01252 
01253 //      DPRINTF("Worker count: %ld", m_workerTrc.getRefCount());
01254 
01255         // pass to thread pool
01256         // TODO: return busy if threadpool is at capacity
01257         void * ctx = (void *) ((handler_intercept_t *) hi);
01258         m_threadpool->addRequest(interceptHandler, ctx);
01259         hi.disown();    // thread will free this when it is done
01260 
01261         // nothing else to do!
01262 }
01263 
01264 
01265 
01266 void
01267 ServerImpl::handleUdp
01268 (
01269 IN const envelope_t& env,
01270 IN const MessageBuffer * msg
01271 )
01272 {
01273         perf::Timer timer("Server::handleUdp");
01274         ASSERT(msg, "null");
01275 
01276         // for now, we handle UDP messages in the main thread
01277         // Any processing that requires different threads should
01278         // be moved to TCP messages!
01279 
01280         // NOTE: this must be threadsafe!  Other threads could be accessing
01281         //      data structures as we are parsing the UDP datagram.
01282         // All data access is behind threadsafe structures.
01283 
01284         // reset parser with input data
01285         ASSERT(m_xdrInput, "should have parser by now");
01286         m_xdrInput->reset((const byte_t *) msg->getData(), msg->getBytes());
01287 
01288         // first thing should be the client (host) global state
01289         xdrbuf::PackletHeader ph = m_xdrInput->getNextPackletHeader();
01290         if ('h' != ph.getName()) {
01291                 // 'h' -- header
01292                 DPRINTF("Malformed UDP packet--no header?");
01293                 return;
01294         }
01295         int32_t host_state[2];  // header is 2 longs: token and clock
01296         m_xdrInput->readInt32s(host_state, 2);
01297 
01298         // verify host token
01299         dword_t token = host_state[0];
01300         conn_id_t conn_id = 0;
01301         host_rec_t hr;
01302 
01303         if (token) {
01304                 // do netlib lookups outside of mutex
01305                 //  (safe since we're on the main thread)
01306                 netlib::connection_info_t ci;
01307                 if (!netlib::getConnectionInfo(token, ci)) {
01308                         DPRINTF("Input UDP token was not valid");
01309                         return;
01310                 }
01311 
01312                 // do address and port match?
01313                 if (ci.address != env.address) {
01314                         DPRINTF("UDP sender does not match token");
01315                         return;
01316                 }
01317 
01318                 // appears to be a valid token!
01319                 conn_id = (conn_id_t) token;
01320         }
01321 
01322         if (conn_id) {
01323                 // appears to be a valid UDP packet from client
01324                 if (!m_hostMgr->getHostByUdpConnectionID(conn_id, hr)) {
01325                         DPRINTF("No host record for conn id: 0x%04lx",
01326                             (long) conn_id);
01327                         return;
01328                 }
01329         
01330                 // remember client's udp clock
01331                 dword_t clock = (dword_t) host_state[1];
01332                 hr.lastUdpReceived = clock;
01333                 m_hostMgr->updateHost(hr);
01334         }
01335 
01336         // parse message
01337         while (!m_xdrInput->endOfStream()) {
01338                 xdrbuf::PackletHeader ph = m_xdrInput->getNextPackletHeader();
01339                 char name = ph.getName();
01340                 //DPRINTF("Parsing packlet with name: '%c'", name);
01341                 switch (name) {
01342 
01343                 case 'a':       // auth attempt
01344                         this->parseHostAuthUdp(hr, ph, m_xdrInput);
01345                         break;
01346 
01347                 case 'c':       // connection attempt
01348                         this->parseHostConnectUdp(env, m_xdrInput);
01349                         break;
01350 
01351                 case 'e':       // client is asking if object exists
01352                         this->parseExistsUdp(hr, m_xdrInput);
01353                         break;
01354 
01355                 case 'g':       // client is sending game data/events
01356                         this->parseGameUdp(hr, ph, m_xdrInput);
01357                         break;
01358 
01359                 case 'p':       // player data
01360                         this->parsePlayerUdp(hr, m_xdrInput);
01361                         break;
01362 
01363                 case 'q':       // client is sending a query request
01364                         this->parseQueryUdp(hr, ph, m_xdrInput);
01365                         break;
01366 
01367                 default:
01368                         DPRINTF("Unknown packlet name! '%c'", name);
01369                 }
01370         }
01371 //      DPRINTF("End of parsing!");
01372 }
01373 
01374 
01375 
01376 void
01377 ServerImpl::parseHostConnectUdp
01378 (
01379 IN const netlib::envelope_t& env,
01380 IO xdrbuf::Input * input
01381 )
01382 {
01383         ASSERT(input, "null");
01384 
01385         // host is attempting to connect
01386         // must be threadsafe!
01387 
01388         DPRINTF("Connection attempt...");
01389         // ASSERT(hr.isValid()) -- host can be invalid for connection attempts!
01390 
01391         // next should be version and token
01392         xdrbuf::PackletHeader ph = input->getNextPackletHeader();
01393         if ('v' != ph.getName()) {
01394                 WAVE_EX(wex);
01395                 wex << "Invalid packlet header during connection";
01396         }
01397         int32_t l[3];
01398         input->readInt32s(l, 3);
01399         if (eProtocol_Long != l[0]) {
01400                 WAVE_EX(wex);
01401                 wex << "Protocol versions do not match";
01402         }
01403         dword_t token = l[1];
01404         if (!token) {
01405                 WAVE_EX(wex);
01406                 wex << "Invalid client connection token";
01407         }
01408         int port = (int) l[2];
01409         if (!port) {
01410                 WAVE_EX(wex);
01411                 wex << "Invalid client UDP port";
01412         }
01413 
01414         DPRINTF("  token = %ld", (long) token);
01415         DPRINTF("  port = %d", port);
01416 
01417         // next should be public key
01418         ph = input->getNextPackletHeader();
01419         if ('p' != ph.getName() ||
01420             xdrbuf::ePacklet_ParentBegin != ph.getType()) {
01421                 WAVE_EX(wex);
01422                 wex << "Invalid packlet header during connection";
01423         }
01424 
01425         // keep reading pieces of the public key...
01426         const long maxSize = 1024;
01427         char buffer[maxSize];
01428         char * offset = buffer;
01429         int read = 0;
01430         while (true) {
01431                 xdrbuf::PackletHeader subph = input->getNextPackletHeader();
01432                 if ('p' == subph.getName()) {
01433                         break;          // end of key pieces
01434                 }
01435                 if (xdrbuf::ePacklet_String != subph.getType()) {
01436                         WAVE_EX(wex);
01437                         wex << "Unexpected packlet while reading public key";
01438                 }
01439 
01440                 int len = subph.getDataCount();
01441                 if (read + len > maxSize - 1) {
01442                         WAVE_EX(wex);
01443                         wex << "Public key is too long!";
01444                 }
01445 
01446                 input->readString(offset, len);
01447                 offset += len;
01448                 read += len;
01449         }
01450         buffer[read] = 0;               // buffer now contains public key
01451 
01452         // should have an end-of-packlet tag now
01453         ph = input->getNextPackletHeader();
01454         if ('c' != ph.getName() ||
01455             xdrbuf::ePacklet_ParentEnd != ph.getType()) {
01456                 WAVE_EX(wex);
01457                 wex << "Invalid end of connection packlet";
01458         }
01459 
01460         // do we already have a connection to this client?
01461         // TODO / WARNING : This is inefficient.  We have to loop through
01462         // all the known hosts to see if one of them matches.  Could make
01463         // this a quicker lookup.  This isn't an issue for the expected
01464         // use case of these libraries (16 connected hosts at max) but
01465         // the host manager (HostManager) should be updated to find
01466         // hosts based on server and port (or other ID) faster--or keep
01467         // this lookup on the client (that is, in this server object).
01468         host_rec_t hr;
01469         aesop::HostManager::iterator_t i;
01470         m_hostMgr->getIterator(i);
01471         while (m_hostMgr->getNextHost(i, hr)) {
01472                 netlib::connection_info_t ci;
01473                 if (netlib::getConnectionInfo(hr.udpConn, ci)) {
01474                         if (env.address == ci.address) {
01475                                 DPRINTF("Already connected!");
01476                                 DPRINTF("  Removing old host record");
01477                                 m_hostMgr->removeHost(hr.udpConn);
01478                         //      return;
01479                         }
01480                 }
01481         }
01482 
01483         // okay, looks to be a legit request!  create a udp connection
01484         std::string server = netlib::getServerFromIP(env.address.ip);
01485         DPRINTF("  server='%s'", server.c_str());
01486         netlib::address_t address(server.c_str(), port);
01487         ASSERT2(address.isValid(), "Invalid client udp address");
01488         conn_id_t hostId = netlib::createUdpRemote(m_udpConn, address);
01489         if (!hostId) {
01490                 WAVE_EX(wex);
01491                 wex << "Failed to create remote UDP connection";
01492         }
01493 
01494         // success!  create host record and queue a connection response
01495         m_hostMgr->addHost(hostId, token, buffer);
01496         if (!m_hostMgr->getHostByUdpConnectionID(hostId, hr)) {
01497                 WAVE_EX(wex);
01498                 wex << "Cannot find a just-added host?";
01499         }
01500         ASSERT(hr.netQueue, "null network request queue for host");
01501         ASSERT(hr.desKey, "null");
01502         ASSERT(hr.publicKey, "null");
01503 
01504         // create context to send back the encrypted DES key
01505         std::string desKey = hr.desKey->serialize();
01506         std::string encryptedDesKey = hr.publicKey->encrypt(desKey.c_str());
01507         smart_ptr<netrq::Request> rc =
01508             new AuthContext(hostId, encryptedDesKey.c_str());
01509         hr.netQueue->addRequest(m_udpClock + 2, rc);
01510 
01511         DPRINTF("Host should have connected!  connId=%08lx", (long) hostId);
01512 }
01513 
01514 
01515 
01516 void
01517 ServerImpl::parseHostAuthUdp
01518 (
01519 IN host_rec_t& hr,
01520 IN const xdrbuf::PackletHeader& ph,
01521 IN xdrbuf::Input * input
01522 )
01523 {
01524         ASSERT(input, "null");
01525 
01526         if (!hr.isValid()) {
01527                 WAVE_EX(wex);
01528                 wex << "Ignoring auth attempt from invalid host";
01529         }
01530         if (eHostMode_Authorized == hr.mode) {
01531                 DPRINTF("Host is already authorized!");
01532                 return;
01533         }
01534         DPRINTF("Host is attempting to authenticate");
01535 
01536         // must be threadsafe!
01537 
01538         // this should be a single string!
01539         if (xdrbuf::ePacklet_String != ph.getType()) {
01540                 WAVE_EX(wex);
01541                 wex << "Invalid packlet type";
01542         }
01543 
01544         const int maxSize = 1024;
01545         int len = ph.getDataCount();
01546         if (len < 1 || len > maxSize - 1) {
01547                 WAVE_EX(wex);
01548                 wex << "Bad password length: " << len;
01549         }
01550 
01551         char buffer[maxSize];
01552         input->readString(buffer, len);
01553         buffer[len] = 0;
01554 
01555         DPRINTF("Received string: %s", buffer);
01556         ASSERT(hr.desKey, "null");
01557         std::string decrypted = hr.desKey->decrypt(buffer);
01558         DPRINTF("Got host password: '%s'", decrypted.c_str());
01559 
01560         hr.mode = eHostMode_Authorized;
01561         m_hostMgr->updateHost(hr);
01562 }
01563 
01564 
01565 
01566 void
01567 ServerImpl::parsePlayerUdp
01568 (
01569 IN host_rec_t& hr,
01570 IO xdrbuf::Input * input
01571 )
01572 {
01573         ASSERT(input, "null");
01574 
01575         // must be threadsafe!
01576 
01577         // check host
01578         if (!hr.isValid()) {
01579                 WAVE_EX(wex);
01580                 wex << "Cannot process player data from invalid host";
01581         }
01582 
01583         // should have ID packlet now
01584         xdrbuf::PackletHeader ph = input->getNextPackletHeader();
01585         char name = ph.getName();
01586         if ('i' != name) {
01587                 WAVE_EX(wex);
01588                 wex << "Invalid packlet in player UDP data.  Expected ID 'i' ";
01589                 wex << "but received '" << name << "' instead.";
01590         }
01591         int32_t playerId = -1;
01592         input->readInt32s(&playerId, 1);
01593         if (playerId < 1) {
01594                 WAVE_EX(wex);
01595                 wex << "Invalid player id: " << playerId;
01596         }
01597 
01598         // get this player's record
01599         player_rec_t rec;
01600         player_rec_t * pr = NULL;
01601         if (m_playerMgr->getPlayerByHostAndPlayerId(hr.udpConn, playerId, rec))
01602         {
01603                 // found the record!
01604                 pr = &rec;
01605         }
01606         if (!pr) {
01607                 // create the player record, but leave pr null
01608                 // this means we will ignore further player data in this udp
01609                 this->newPlayer(hr, playerId);
01610         }
01611 
01612         // parse all data for player
01613         for (;;) {
01614                 ph = input->getNextPackletHeader();
01615                 name = ph.getName();
01616 
01617                 switch (name) {
01618                 case 'p':
01619                         return;         // end of player packlet
01620 
01621                 case 'm':
01622                         {
01623                                 // player wants to move
01624                                 float f[7];
01625                                 input->readFloats(f, 7);
01626                                 float dt = f[3];
01627                                 if (dt > 0.0 && pr && pr->obj) {
01628                                         // TODO: game logic gets this!
01629                                         // for now we do some simple logic here
01630                                         point3d_t delta(f[0], f[1], f[2]);
01631                                         //delta.dump("  requested move");
01632 
01633                                         // NOTE: euler angles are f[4] - f[6]
01634 
01635                                         // what if delta is too big?  Can lead
01636                                         // to funky oscillations.  Keep to max
01637                                         float r2 = dotProduct(delta, delta);
01638                                         float r = sqrt(r2);
01639                                         if (r > 0.4) {
01640                                                 delta = (0.4 / r) * delta;
01641                                         }
01642                                         pr->obj->requestMove(delta, dt);
01643 
01644                                         // set orientation
01645                                         // (we only use y-rotation for now...)
01646                                         point3d_t euler(0, f[5], 0);
01647                                         quaternion_t q;
01648                                         q.setEulerZYX(euler);
01649                                         pr->obj->setOrientation(q);
01650                                 }
01651                         }
01652                         break;
01653 
01654                 default:
01655                         {
01656                                 WAVE_EX(wex);
01657                                 wex << "Unrecognized player packet: '";
01658                                 wex << name << "'";
01659                         }
01660                 }       // end of switch statement
01661         }       // end of packlet header loop
01662 }
01663 
01664 
01665 
01666 void
01667 ServerImpl::parseQueryUdp
01668 (
01669 IN host_rec_t& hr,
01670 IN const xdrbuf::PackletHeader& ph,
01671 IO xdrbuf::Input * input
01672 )
01673 {
01674         ASSERT(input, "null");
01675 
01676         // must be threadsafe!
01677 
01678         // check host
01679         if (!hr.isValid()) {
01680                 WAVE_EX(wex);
01681                 wex << "Cannot process query from invalid host";
01682         }
01683 
01684         // validate packlet header
01685         if (xdrbuf::ePacklet_Int32s != ph.getType()) {
01686                 WAVE_EX(wex);
01687                 wex << "query packlets should be of type int32";
01688         }
01689         dword_t objId;
01690         input->readInt32s((int32_t *) &objId, 1);
01691 
01692         //DPRINTF("Client wants information about object: %ld", objId);
01693         if (!objId) {
01694                 WAVE_EX(wex);
01695                 wex << "Invalid object ID requested";
01696         }
01697 
01698         // add response to this client's request queue
01699         ASSERT(hr.netQueue, "null");
01700         if (hr.netQueue->size() > s_maxNetQueueLength) {
01701                 DPRINTF("Dropping client request");
01702                 return;         // don't add anything, we have enough to do
01703         }
01704 
01705         smart_ptr<netrq::Request> ctx = new QueryContext(objId);
01706         ASSERT(ctx, "out of memory");
01707         hr.netQueue->addRequest(m_udpClock + 2, ctx);
01708 }
01709 
01710 
01711 
01712 void
01713 ServerImpl::parseGameUdp
01714 (
01715 IN host_rec_t& hr,
01716 IN const xdrbuf::PackletHeader& ph,
01717 IO xdrbuf::Input * input
01718 )
01719 {
01720         perf::Timer timer("parseGameUdp");
01721         ASSERT(input, "null");
01722         ASSERT(m_gameLogic, "null");
01723 
01724         m_gameLogic->parseGameData(hr.udpConn, input);
01725 }
01726 
01727 
01728 
01729 void
01730 ServerImpl::parseExistsUdp
01731 (
01732 IN host_rec_t& hr,
01733 IO xdrbuf::Input * input
01734 )
01735 {
01736         ASSERT(input, "null");
01737 
01738         // host is asking if this object exists.
01739         // get the object ID the client is interested in
01740         dword_t objectId;
01741         input->readInt32s((int32_t *) &objectId, 1);
01742         if (objectId < 1) {
01743                 WAVE_EX(wex);
01744                 wex << "Bad object ID: " << objectId;
01745         }
01746 
01747         // create a message ID for this response
01748         char buffer[64];
01749         sprintf(buffer, "er-%ld", (long) objectId);
01750         if (hr.netQueue->containsRequest(buffer))
01751                 return;         // already pending!
01752 
01753         // need to send a message back
01754         smart_ptr<netrq::Request> request =
01755             ObjectExistsResponse::create(buffer, objectId);
01756         ASSERT(request, "null");
01757         hr.netQueue->addRequest(m_udpClock, request);
01758 }
01759 
01760 
01761 
01762 void
01763 ServerImpl::sendUdp
01764 (
01765 void
01766 )
01767 {
01768         // loop through all hosts and send!
01769         // this must be threadsafe.  We don't need to acquire the server
01770         //      mutex since all collections are threadsafe.
01771         // increment UDP clock
01772         ++m_udpClock;
01773 
01774         // set up outbound physics updates
01775         addPhysicsMessagesToQueue(m_udpClock, m_hostMgr, m_playerMgr, m_mapMgr);
01776 
01777         // send to each host
01778         vec_conn_t badConns;            // bad connections
01779         HostManager::iterator_t i;
01780         host_rec_t hr;
01781         m_hostMgr->getIterator(i);
01782         while (m_hostMgr->getNextHost(i, hr)) {
01783                 if (!hr.outbuf || !hr.udpConn) {
01784                         DPRINTF("No way to send");
01785                         continue;       // skip this one
01786                 }
01787 
01788                 // is this still a good host connection?
01789                 if (!netlib::isValidConnection(hr.udpConn)) {
01790                         DPRINTF("Removing host record!  0x%04lx",
01791                             (long) hr.udpConn);
01792                         badConns.push_back(hr.udpConn);
01793                         continue;               // skip host
01794                 }
01795 
01796                 // construct data!
01797                 hr.outbuf->clear();
01798 
01799                 // UDP header is 6 longs:
01800                 //      0: token
01801                 //      1: current server clock
01802                 //      2: last clock received from client
01803                 //      3: server status
01804                 //      4: host status
01805                 //      5: server's listening TCP port
01806                 int32_t l[6];
01807                 l[0] = hr.udpToken;
01808                 l[1] = m_udpClock;
01809                 l[2] = hr.lastUdpReceived;
01810                 l[3] = m_smode;
01811                 l[4] = hr.mode;
01812                 l[5] = m_tcpPort;
01813 
01814                 // send general state ('h' -- header)
01815                 hr.outbuf->addInt32Packlet('h', l, 6);
01816 
01817                 // players on this host
01818                 PlayerManager::iterator_t i;
01819                 m_playerMgr->getIterator(hr.udpConn, i);
01820                 player_rec_t pr;
01821                 while (m_playerMgr->getNextPlayer(i, pr)) {
01822                         // send player status
01823                         dword_t objectId = pr.obj ? pr.obj->getId() : 0;
01824 
01825                         // Player status is 3 longs right now:
01826                         //   0: playerId
01827                         //   1: player state (enum)
01828                         //   2: object ID (can be null)
01829                         int32_t l[3];
01830                         l[0] = pr.playerId;     // playerId
01831                         l[1] = pr.getMode();
01832                         l[2] = objectId;
01833                         hr.outbuf->addInt32Packlet('p', l, 3);
01834                 }
01835 
01836                 // add any pending messages, as space permits
01837                 netrq::sendMessagesFromQueue(hr.outbuf, m_udpClock, hr.netQueue);
01838 
01839                 // other data?
01840 
01841                 // construct netlib message
01842                 // TODO: avoid the copy here!
01843                 smart_ptr<MessageBuffer> msg = MessageBuffer::create();
01844                 ASSERT(msg, "null");
01845                 const byte_t * data = hr.outbuf->getData();
01846                 int bytes = hr.outbuf->getDataBytes();
01847                 ASSERT(bytes > 0, "Bad bytes: %d", bytes);
01848                 msg->reserve(bytes);
01849                 msg->appendData((const char *) data, bytes);
01850                 msg->close();
01851 
01852                 // send
01853                 netlib::enqueueMessage(hr.udpConn, msg);
01854         }
01855 //      DPRINTF("Done sending data to hosts");
01856 
01857         // remove bad hosts
01858         for (vec_conn_t::iterator ib = badConns.begin(); ib != badConns.end();
01859              ++ib) {
01860                 conn_id_t conn_id = *ib;
01861                 DPRINTF("  Removing host with connection ID 0x%04lx",
01862                     (long) conn_id);
01863                 m_hostMgr->removeHost(conn_id);
01864         }
01865 }
01866 
01867 
01868 
01869 const char *
01870 ServerImpl::getStoryGuidTS
01871 (
01872 void
01873 )
01874 {
01875         ASSERT(m_story, "null");
01876 
01877         return m_story->getUuid();
01878 }
01879 
01880 
01881 
01882 bool
01883 ServerImpl::isAdmin
01884 (
01885 IN conn_id_t udpConnID,
01886 IN int playerId
01887 )
01888 {
01889         ASSERT(udpConnID, "null");
01890         ASSERT(playerId > 0, "bad player id: %d", playerId);
01891         ASSERT(m_playerMgr, "null");
01892 
01893         // NOTE: this method is threadsafe, since playerMgr is threadsafe
01894         player_rec_t pr;
01895         if (!m_playerMgr->getPlayerByHostAndPlayerId(udpConnID, playerId, pr)) {
01896                 return false;
01897         }
01898         return ePrivilege_Admin & pr.flags;
01899 }
01900 
01901 
01902 
01903 void
01904 ServerImpl::checkPlayers
01905 (
01906 void
01907 )
01908 {
01909         // threadsafe!
01910 
01911         // TODO: this should go away!  Game Logic object should handle this
01912         // in its tick() call.
01913 
01914         // loop through all players
01915         PlayerManager::iterator_t i;
01916         m_playerMgr->getIterator(0, i);
01917         player_rec_t pr;
01918         while (m_playerMgr->getNextPlayer(i, pr)) {
01919                 if (pr.isAuthenticated() &&
01920                     !pr.isInMap()) {
01921                         try {
01922                                 // game logic object can handle this
01923                                 if (m_gameLogic->requestPlayerStartTS(pr)) {
01924                                         m_playerMgr->updatePlayer(pr);
01925                                 }
01926                         } catch (std::exception& e) {
01927                                 DPRINTF("Exception: %s", e.what());
01928                         }
01929                 }
01930         }
01931 }
01932 
01933 
01934 
01935 void
01936 ServerImpl::sendBroadcast
01937 (
01938 void
01939 )
01940 {
01941         ASSERT2(m_udpBcast, "No UDP broadcast point?");
01942         ASSERT2(m_xdrBcast, "null");
01943         ASSERT2(m_story, "null");
01944 
01945 //      DPRINTF("Broadcasting!");
01946 
01947         m_xdrBcast->clear();
01948 
01949         // write public name ('server' for player)
01950         int len = m_publicName.length();
01951         m_xdrBcast->addStringPacklet('s', m_publicName.c_str(), len);
01952 
01953         // write story UUID
01954         const char * uuid = m_story->getUuid();
01955         ASSERT2(uuid, "null");
01956         len = strlen(uuid);
01957         m_xdrBcast->addStringPacklet('u', uuid, len);
01958 
01959         // write port
01960         int32_t port = m_udpPort;
01961         m_xdrBcast->addInt32Packlet('p', &port, 1);
01962 
01963         // construct netlib message
01964         // TODO: avoid the copy here!
01965         smart_ptr<MessageBuffer> msg = MessageBuffer::create();
01966         ASSERT(msg, "null");
01967         const byte_t * data = m_xdrBcast->getData();
01968         int bytes = m_xdrBcast->getDataBytes();
01969         ASSERT(bytes > 0, "Bad bytes: %d", bytes);
01970         msg->reserve(bytes);
01971         msg->appendData((const char *) data, bytes);
01972         msg->close();
01973 
01974         // send
01975         netlib::enqueueMessage(m_udpBcast, msg);
01976         
01977         // that's it!
01978 }
01979 
01980 
01981 
01982 ////////////////////////////////////////////////////////////////////////////////
01983 //
01984 //      public API
01985 //
01986 ////////////////////////////////////////////////////////////////////////////////
01987 
01988 smart_ptr<Server>
01989 Server::create
01990 (
01991 IN smart_ptr<Datahash>& params,
01992 IN smart_ptr<ServerGameLogic>& gameLogic
01993 )
01994 {
01995         ASSERT(params, "null");
01996         ASSERT(gameLogic, "null");
01997 
01998         smart_ptr<ServerImpl> local = new ServerImpl;
01999         ASSERT(local, "out of memory");
02000 
02001         // initialize the server
02002         local->initialize(params, gameLogic);
02003 
02004         return local;
02005 }
02006 
02007 
02008 };      // aesop namespace
02009