首先看一下org.menacheri.jetserver.server包
接口Server.java
package org.menacheri.jetserver.server; import java.net.InetSocketAddress; import org.menacheri.jetserver.app.Session; public interface Server { public interface TransmissionProtocol{ } //内部接口,只有域没有方法 public enum TRANSMISSION_PROTOCOL implements TransmissionProtocol { TCP,UDP; } //枚举类实现了传输协议的种类 TransmissionProtocol getTransmissionProtocol(); void startServer() throws Exception; void startServer(int port) throws Exception; void startServer(InetSocketAddress socketAddress) throws Exception; void stopServer() throws Exception; InetSocketAddress getSocketAddress(); Session getSession(); void setSession(Session session); }
接口ServerManager.java
package org.menacheri.jetserver.server; /** * A generic interface used to manage a server. * @author Abraham Menacherry * */ public interface ServerManager { public void startServers(int tcpPort, int flashPort, int udpPort) throws Exception; public void startServers() throws Exception; /** * Used to stop the server and manage cleanup of resources. * */ public void stopServers() throws Exception; }
ServerMangerImpl.java
import java.util.Set; import org.menacheri.jetserver.context.AppContext; import org.menacheri.jetserver.server.ServerManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ServerManagerImpl implements ServerManager { private Set<AbstractNettyServer> servers; private static final Logger LOG = LoggerFactory.getLogger(ServerManagerImpl.class); public ServerManagerImpl() { servers = new HashSet<AbstractNettyServer>(); //用set接口指向了HashSet实例 } @Override public void startServers(int tcpPort, int flashPort, int udpPort) throws Exception { if(tcpPort > 0) { AbstractNettyServer tcpServer = (AbstractNettyServer)AppContext.getBean(AppContext.TCP_SERVER); tcpServer.startServer(tcpPort); servers.add(tcpServer); } if(flashPort > 0) { AbstractNettyServer flashServer = (AbstractNettyServer)AppContext.getBean(AppContext.FLASH_POLICY_SERVER); flashServer.startServer(flashPort); servers.add(flashServer); } if(udpPort > 0) { AbstractNettyServer udpServer = (AbstractNettyServer)AppContext.getBean(AppContext.UDP_SERVER); udpServer.startServer(udpPort); servers.add(udpServer); } } @Override public void startServers() throws Exception { AbstractNettyServer tcpServer = (AbstractNettyServer)AppContext.getBean(AppContext.TCP_SERVER); tcpServer.startServer(); servers.add(tcpServer); AbstractNettyServer flashServer = (AbstractNettyServer)AppContext.getBean(AppContext.FLASH_POLICY_SERVER); flashServer.startServer(); servers.add(flashServer); AbstractNettyServer udpServer = (AbstractNettyServer)AppContext.getBean(AppContext.UDP_SERVER); udpServer.startServer(); servers.add(udpServer); } @Override public void stopServers() throws Exception { for(AbstractNettyServer nettyServer: servers){ try { nettyServer.stopServer(); } catch (Exception e) { LOG.error("Unable to stop server {} due to error {}", nettyServer,e); throw e; } } } }
首先看一下Server和ServerManager这两个接口:
(1)ServerManager用于管理多个Server,提供对外的访问接口。
(2)Server中的方法是包级保护的,这个包其实只有Server和ServerManger两个类,所以Server中的方法只能被ServerManger访问。
(3)这两个接口提供了服务类通用的方法,如开始服务,结束服务,返回套接字地址,传输协议,Session和设置Session等。
接下来看一下org.menacheri.jetserver.server.netty这个包,
接口NettyServer.java
package org.menacheri.jetserver.server.netty; import org.jboss.netty.bootstrap.Bootstrap; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.ChannelPipelineFactory; import org.menacheri.jetserver.server.Server; /** * An interface specific to the JBoss Netty implementation. It will be * implemented by a class that will start up a Netty server at a specified port. * * @author Abraham Menacherry * */ public interface NettyServer extends Server { /** * Creates a {@link ServerBootstrap} object which is used to start a server. * * @return Returns the created {@link ServerBootstrap}. */ public Bootstrap createServerBootstrap(); /** * If thread pools or TCP/IP parameters or the pipeline factory need to be * modified then it is this method that needs to be overriden. * * @param optionsList * Used to set tcp ip options like noDelay etc. */ public void configureServerBootStrap(String[] optionsList); //这个方法用于修改TCP/IP参数以及pipeline factory /** * createServerBootstrap will create a pipeline factory and save it as a * class variable. This method can then be used to retrieve that value. * * @return Returns the channel pipeline factory that is associated with this * netty server. */ public ChannelPipelineFactory getPipelineFactory(); /** * Method can be used to set the pipeline factory that is to be used by the * netty server. * * @param factory * The factory which will create a pipeline on each incoming * connection. */ public void setPipelineFactory(ChannelPipelineFactory factory); /** * @return Returns the created server bootstrap object. */ public Bootstrap getServerBootstrap(); /** * Sets the server bootstrap, could be TCP, UDP bootstrap. * * @param serverBootstrap */ public void setServerBootstrap(Bootstrap serverBootstrap); }
NettyServer接口提供了Netty服务器特有的一些方法,如设置和返回bootstrap和pipelineFactory。
抽象类AbstractNettyServer.java
package org.menacheri.jetserver.server.netty; import java.net.InetSocketAddress; import org.jboss.netty.bootstrap.Bootstrap; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroupFuture; import org.jboss.netty.channel.group.DefaultChannelGroup; import org.menacheri.jetserver.app.Session; import org.menacheri.jetserver.service.GameAdminService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Required; public abstract class AbstractNettyServer implements NettyServer { private static final Logger LOG = LoggerFactory.getLogger(AbstractNettyServer.class); public static final ChannelGroup ALL_CHANNELS = new DefaultChannelGroup("JETSERVER-CHANNELS"); /* 需要被子类继承的域用protected访问权限 */ protected Session session; protected InetSocketAddress socketAddress; protected int portNumber = 18090; protected Bootstrap serverBootstrap; protected ChannelPipelineFactory pipelineFactory; protected GameAdminService gameAdminService; public AbstractNettyServer() { super(); } @Override public void stopServer() throws Exception { LOG.debug("In stopServer method of class: {}", this.getClass().getName()); ChannelGroupFuture future = ALL_CHANNELS.close(); try { future.await(); } catch (InterruptedException e) { LOG.error("Execption occurred while waiting for channels to close: {}",e); } serverBootstrap.releaseExternalResources(); gameAdminService.shutdown(); } @Override public void configureServerBootStrap(String[] optionsList) { // For clients who do not use spring. if(null == serverBootstrap){ createServerBootstrap(); } serverBootstrap.setPipelineFactory(pipelineFactory); if (null != optionsList && optionsList.length > 0) { for (String option : optionsList) { serverBootstrap.setOption(option, true); } } } public int getPortNumber(String[] args) { if (null == args || args.length < 1) { return portNumber; } try { return Integer.parseInt(args[0]); } catch (NumberFormatException e) { LOG.error("Exception occurred while " + "trying to parse the port number: {}", args[0]); LOG.error("NumberFormatException: {}",e); throw e; } } @Override public Bootstrap getServerBootstrap() { return serverBootstrap; } @Override public void setServerBootstrap(Bootstrap serverBootstrap) { this.serverBootstrap = serverBootstrap; } @Override public ChannelPipelineFactory getPipelineFactory() { return pipelineFactory; } @Override @Required public void setPipelineFactory(ChannelPipelineFactory factory) { pipelineFactory = factory; } public int getPortNumber() { return portNumber; } public void setPortNumber(int portNumber) { this.portNumber = portNumber; } public GameAdminService getGameAdminService() { return gameAdminService; } public void setGameAdminService(GameAdminService gameAdminService) { this.gameAdminService = gameAdminService; } @Override public InetSocketAddress getSocketAddress() { return socketAddress; } public void setInetAddress(InetSocketAddress inetAddress) { this.socketAddress = inetAddress; } @Override public String toString() { return "NettyServer [socketAddress=" + socketAddress + ", portNumber=" + portNumber + "]"; } @Override public Session getSession() { return session; } @Override public void setSession(Session session) { this.session = session; } }
AbstractNettyServer抽象类真正实现了了一个Netty服务器,增加了session,socketaddress,pipelineFactory,bootstrap等域,并实现了通用的接口。一些特有的接口如startServer(),stopServer(),creatServerBootstrap()和getTransmissionProtocol()等方法需要在子类中实现,相应的有TCP服务器,UDP服务器和flash服务器。
NettyTcpServer.java
package org.menacheri.jetserver.server.netty; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.Bootstrap; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.group.ChannelGroupFuture; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.menacheri.jetserver.concurrent.NamedThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * This class is used for TCP IP communications with client. It uses Netty tcp * server bootstrap for this. * * @author Abraham Menacherry * */ public class NettyTCPServer extends AbstractNettyServer { private static final Logger LOG = LoggerFactory.getLogger(NettyTCPServer.class); private String[] args; public NettyTCPServer() { } public void startServer(int port) throws Exception { portNumber = port; startServer(args); } @Override public void startServer() throws Exception { startServer(args); } public void startServer(String[] args) throws Exception { int portNumber = getPortNumber(args); InetSocketAddress socketAddress = new InetSocketAddress(portNumber); startServer(socketAddress); } public Bootstrap createServerBootstrap() { // TODO The thread pools should be injected from spring. serverBootstrap = new ServerBootstrap( new NioServerSocketChannelFactory(Executors .newCachedThreadPool(new NamedThreadFactory( "TCP-Server-Boss")), Executors .newCachedThreadPool(new NamedThreadFactory( "TCP-Server-Worker")))); return serverBootstrap; } @Override public TransmissionProtocol getTransmissionProtocol() { return TRANSMISSION_PROTOCOL.TCP; } @Override public void startServer(InetSocketAddress socketAddress) { this.socketAddress = socketAddress; if (null == args || args.length == 0) { String[] optionsList = new String[2]; optionsList[0] = "child.tcpNoDelay"; optionsList[1] = "child.keepAlive"; configureServerBootStrap(optionsList); } else { configureServerBootStrap(args); } try { ((ServerBootstrap) serverBootstrap).bind(socketAddress); } catch (ChannelException e) { LOG.error("Unable to start TCP server due to error {}",e); throw e; } } public void stopServer() throws Exception { LOG.debug("In stopServer method of class: {}", this.getClass().getName()); ChannelGroupFuture future = ALL_CHANNELS.close(); try { future.await(); } catch (InterruptedException e) { LOG.error("Execption occurred while waiting for channels to close: {}",e); } super.stopServer(); } public String[] getArgs() { return args; } public void setArgs(String[] args) { this.args = args; } @Override public String toString() { return "NettyTCPServer [args=" + Arrays.toString(args) + ", socketAddress=" + socketAddress + ", portNumber=" + portNumber + "]"; } }
NettyUdpServer.java
package org.menacheri.jetserver.server.netty; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.Bootstrap; import org.jboss.netty.bootstrap.ConnectionlessBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory; import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; import org.menacheri.jetserver.concurrent.NamedThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * This server does UDP connection less broadcast. Since it does not store the * connection, each call to a channel write must also contain the remote socket * address <code>e.getChannel().write("Message", e.getRemoteAddress())</code>. * Since it uses the same channel for all incoming connections, the handlers * cannot be modified refer to <a * href="http://www.jboss.org/netty/community.html#nabble-f685700">nabble * post</a> * * @author Abraham Menacherry * */ public class NettyUDPServer extends AbstractNettyServer { private static final Logger LOG = LoggerFactory.getLogger(NettyUDPServer.class); private FixedReceiveBufferSizePredictorFactory bufferSizePredictor; private String[] args; /** * The connected channel for this server. This reference can be used to * shutdown this server. */ private Channel channel; public NettyUDPServer() { } @Override public void startServer(int port) throws Exception { portNumber = port; startServer(args); } @Override public void startServer() throws Exception { startServer(args); } public void startServer(String[] args) throws Exception { int portNumber = getPortNumber(args); InetSocketAddress socketAddress = new InetSocketAddress(portNumber); startServer(socketAddress); } @Override public Bootstrap createServerBootstrap() { serverBootstrap = new ConnectionlessBootstrap( new NioDatagramChannelFactory(Executors .newCachedThreadPool(new NamedThreadFactory( "UDP-Server-Worker")))); return serverBootstrap; } @Override public void stopServer() throws Exception { if(null != channel) { channel.close(); } super.stopServer(); } public FixedReceiveBufferSizePredictorFactory getBufferSizePredictor() { return bufferSizePredictor; } public void setBufferSizePredictor( FixedReceiveBufferSizePredictorFactory bufferSizePredictor) { this.bufferSizePredictor = bufferSizePredictor; } @Override public TransmissionProtocol getTransmissionProtocol() { return TRANSMISSION_PROTOCOL.UDP; } @Override public void startServer(InetSocketAddress socketAddress) { this.socketAddress = socketAddress; //TODO these should be set from spring serverBootstrap.setOption("broadcast", "false"); serverBootstrap.setOption("receiveBufferSizePredictorFactory", bufferSizePredictor); serverBootstrap.setOption("sendBufferSize", 65536); serverBootstrap.setOption("receiveBufferSize", 65536); configureServerBootStrap(args); try { channel = ((ConnectionlessBootstrap) serverBootstrap) .bind(socketAddress); } catch (ChannelException e) { LOG.error("Unable to start UDP server due to error {}",e); throw e; } } public String[] getArgs() { return args; } public void setArgs(String[] args) { this.args = args; } @Override public String toString() { return "NettyUDPServer [args=" + Arrays.toString(args) + ", socketAddress=" + socketAddress + ", portNumber=" + portNumber + "]"; } }
FlashPolicyServer.java
package org.menacheri.jetserver.server.netty; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.Bootstrap; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.menacheri.jetserver.concurrent.NamedThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class FlashPolicyServer extends NettyTCPServer { private static final Logger LOG = LoggerFactory.getLogger(FlashPolicyServer.class); private int portNumber = 843; public int getPortNumber(String[] args) { if (null == args || args.length != 2) { LOG.debug("Going to use port: {}", portNumber); return portNumber; } try { int portNumberArg = Integer.parseInt(args[1]); LOG.debug("Going to use port: {}", portNumberArg); return portNumberArg; } catch (NumberFormatException e) { LOG.error("Exception occurred while " + "trying to parse the port number: {}, {}", args[0], e); throw e; } } public Bootstrap createServerBootstrap() { // TODO The thread pools should be injected from spring. serverBootstrap = new ServerBootstrap( new NioServerSocketChannelFactory(Executors .newFixedThreadPool(1,new NamedThreadFactory( "Flash-Server-Boss")), Executors .newFixedThreadPool(1,new NamedThreadFactory( "Flash-Server-Worker")))); return serverBootstrap; } public int getPortNumber() { return portNumber; } public void setPortNumber(int portNumber) { this.portNumber = portNumber; } }
所有评论(0)