Merge branch 'Memcached_Client'
Conflicts: MemcachedClient/src/client.xml MemcachedClient/src/common/MessageManager.java MemcachedClient/src/log4j.properties MemcachedClient/src/messageBody/memcachedmsg.java MemcachedClient/src/messageBody/requestMsg.java MemcachedClient/src/r_memcached/Client.java MemcachedClient/src/r_memcached/MClientPipelineFactory.java MemcachedClient/src/server/ClientConfig.java MemcachedClient/src/server/LockKey.java MemcachedClient/src/server/MDecoder.java MemcachedClient/src/server/MEncoder.java MemcachedClient/src/server/MServerHandler.java MemcachedClient/src/server/MServerPipelineFactory.java MemcachedClient/src/server/NetMsg.java MemcachedClient/src/server/Server.java R-Memcached/protobuf/memcached.proto R-Memcached/protobuf/requestMsg.proto R-Memcached/src/client.xml R-Memcached/src/client/Client.java R-Memcached/src/client/ClientMgr.java R-Memcached/src/client/MClientHandler.java R-Memcached/src/client/MClientPipelineFactory.java R-Memcached/src/common/EMSGID.java R-Memcached/src/common/MessageManager.java R-Memcached/src/common/RegisterHandler.java R-Memcached/src/config.properties R-Memcached/src/log4j.properties R-Memcached/src/memcached/ByteBufArrayInputStream.java R-Memcached/src/memcached/ContextObjectInputStream.java R-Memcached/src/memcached/ErrorHandler.java R-Memcached/src/memcached/LineInputStream.java R-Memcached/src/memcached/MemcachedClient.java R-Memcached/src/memcached/NativeHandler.java R-Memcached/src/memcached/NestedIOException.java R-Memcached/src/memcached/SockIO.java R-Memcached/src/messageBody/memcachedmsg.java R-Memcached/src/messageBody/requestMsg.java R-Memcached/src/server/ClientConfig.java R-Memcached/src/server/LockKey.java R-Memcached/src/server/MDecoder.java R-Memcached/src/server/MEncoder.java R-Memcached/src/server/MServerHandler.java R-Memcached/src/server/MServerPipelineFactory.java R-Memcached/src/server/MemcachedMain.java R-Memcached/src/server/NetMsg.java R-Memcached/src/server/Server.java R-Memcached/src/server/memSession.java R-Memcached/src/test/MainTest.java src/client.xml src/client/Client.java src/client/MClientPipelineFactory.java src/common/MessageManager.java src/log4j.properties src/messageBody/memcachedmsg.java src/messageBody/requestMsg.java src/server/ClientConfig.java src/server/LockKey.java src/server/MDecoder.java src/server/MEncoder.java src/server/MServerHandler.java src/server/MServerPipelineFactory.java src/server/NetMsg.java src/server/Server.java
This commit is contained in:
commit
05a95ad014
|
@ -0,0 +1,27 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<clients>
|
||||||
|
<client>
|
||||||
|
<id>0000</id>
|
||||||
|
<host>127.0.0.1</host>
|
||||||
|
<client_port>30000</client_port>
|
||||||
|
<memcached>127.0.0.1:20000</memcached>
|
||||||
|
</client>
|
||||||
|
<client>
|
||||||
|
<id>0001</id>
|
||||||
|
<host>127.0.0.1</host>
|
||||||
|
<client_port>30001</client_port>
|
||||||
|
<memcached>127.0.0.1:20001</memcached>
|
||||||
|
</client>
|
||||||
|
<client>
|
||||||
|
<id>0002</id>
|
||||||
|
<host>127.0.0.1</host>
|
||||||
|
<client_port>30002</client_port>
|
||||||
|
<memcached>127.0.0.1:20002</memcached>
|
||||||
|
</client>
|
||||||
|
<client>
|
||||||
|
<id>0003</id>
|
||||||
|
<host>127.0.0.1</host>
|
||||||
|
<client_port>30003</client_port>
|
||||||
|
<memcached>127.0.0.1:20003</memcached>
|
||||||
|
</client>
|
||||||
|
</clients>
|
|
@ -0,0 +1,20 @@
|
||||||
|
package common;
|
||||||
|
|
||||||
|
public enum EMSGID
|
||||||
|
{
|
||||||
|
nr_read,
|
||||||
|
nr_read_res,
|
||||||
|
nr_write,
|
||||||
|
nr_write_copy,
|
||||||
|
nr_write_res,
|
||||||
|
nr_connected_mem,
|
||||||
|
nr_connected_mem_back,
|
||||||
|
|
||||||
|
nm_connected,
|
||||||
|
nm_connected_mem_back,
|
||||||
|
nm_connected_web_back,
|
||||||
|
nm_read,
|
||||||
|
nm_write_1,
|
||||||
|
nm_write_1_res,
|
||||||
|
nm_write_2
|
||||||
|
}
|
|
@ -0,0 +1,38 @@
|
||||||
|
package common;
|
||||||
|
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import com.google.protobuf.GeneratedMessage;
|
||||||
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
|
import com.google.protobuf.MessageLite;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @author King
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class MessageManager
|
||||||
|
{
|
||||||
|
private static Map<Integer, MessageLite> messageMap = new HashMap<Integer, MessageLite>();
|
||||||
|
|
||||||
|
public static void addMessageCla(int id, Class<? extends GeneratedMessage> msgCla) throws NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
|
||||||
|
if(msgCla==null)
|
||||||
|
return;
|
||||||
|
Method method = msgCla.getMethod("getDefaultInstance");
|
||||||
|
MessageLite lite = (MessageLite) method.invoke(null, null);
|
||||||
|
messageMap.put(id, lite);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static MessageLite getMessage(Integer id, byte[] body) throws InvalidProtocolBufferException {
|
||||||
|
MessageLite list = messageMap.get(id);
|
||||||
|
if (list == null)
|
||||||
|
{
|
||||||
|
System.err.println(id.toString());
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return list.newBuilderForType().mergeFrom(body).build();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,43 @@
|
||||||
|
package common;
|
||||||
|
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import messageBody.memcachedmsg.nm_Connected;
|
||||||
|
import messageBody.requestMsg.nr_Connected_mem_back;
|
||||||
|
import messageBody.requestMsg.nr_Read_res;
|
||||||
|
import messageBody.requestMsg.nr_write_res;
|
||||||
|
|
||||||
|
import com.google.protobuf.GeneratedMessage;
|
||||||
|
|
||||||
|
|
||||||
|
public class RegisterHandler
|
||||||
|
{
|
||||||
|
public static void initHandler()
|
||||||
|
{
|
||||||
|
initHandler(EMSGID.nm_connected.ordinal(), nm_Connected.class);
|
||||||
|
initHandler(EMSGID.nr_connected_mem_back.ordinal(), nr_Connected_mem_back.class);
|
||||||
|
initHandler(EMSGID.nr_read_res.ordinal(), nr_Read_res.class);
|
||||||
|
initHandler(EMSGID.nr_write_res.ordinal(), nr_write_res.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void initHandler(int id, Class<? extends GeneratedMessage> msgCla)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
MessageManager.addMessageCla(id, msgCla);
|
||||||
|
} catch (NoSuchMethodException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (SecurityException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (IllegalAccessException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (InvocationTargetException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1 @@
|
||||||
|
replicasNum = 4
|
|
@ -0,0 +1,4 @@
|
||||||
|
log4j.rootLogger=WARN, stdout
|
||||||
|
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||||
|
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
||||||
|
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,39 @@
|
||||||
|
package r_memcached;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import org.jboss.netty.bootstrap.ClientBootstrap;
|
||||||
|
import org.jboss.netty.channel.ChannelFuture;
|
||||||
|
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
|
||||||
|
|
||||||
|
public class Client
|
||||||
|
{
|
||||||
|
ClientBootstrap bootstrap;
|
||||||
|
ChannelFuture channelFuture;
|
||||||
|
String host;
|
||||||
|
int port;
|
||||||
|
int id;
|
||||||
|
|
||||||
|
public boolean init(String host, int port)
|
||||||
|
{
|
||||||
|
bootstrap = new ClientBootstrap(
|
||||||
|
new NioClientSocketChannelFactory(
|
||||||
|
Executors.newCachedThreadPool(),
|
||||||
|
Executors.newCachedThreadPool()));
|
||||||
|
// Set up the event pipeline factory.
|
||||||
|
bootstrap.setPipelineFactory(new MClientPipelineFactory());
|
||||||
|
// Start the connection attempt.
|
||||||
|
channelFuture = bootstrap.connect(new InetSocketAddress(host, port));
|
||||||
|
return channelFuture.isSuccess();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop()
|
||||||
|
{
|
||||||
|
channelFuture.awaitUninterruptibly();
|
||||||
|
if (!channelFuture.isSuccess())
|
||||||
|
{
|
||||||
|
channelFuture.getCause().printStackTrace();
|
||||||
|
}
|
||||||
|
channelFuture.getChannel().getCloseFuture().awaitUninterruptibly();
|
||||||
|
bootstrap.releaseExternalResources();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,81 @@
|
||||||
|
package r_memcached;
|
||||||
|
|
||||||
|
|
||||||
|
import messageBody.requestMsg.nr_Connected_mem;
|
||||||
|
import messageBody.requestMsg.nr_Read_res;
|
||||||
|
import messageBody.requestMsg.nr_write_res;
|
||||||
|
|
||||||
|
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||||
|
import org.jboss.netty.channel.ChannelStateEvent;
|
||||||
|
import org.jboss.netty.channel.ExceptionEvent;
|
||||||
|
import org.jboss.netty.channel.MessageEvent;
|
||||||
|
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||||
|
|
||||||
|
import server.NetMsg;
|
||||||
|
import server.webSession;
|
||||||
|
import common.EMSGID;
|
||||||
|
|
||||||
|
|
||||||
|
public class MClientHandler extends SimpleChannelUpstreamHandler
|
||||||
|
{
|
||||||
|
|
||||||
|
private static int ticks=0;
|
||||||
|
private static long diffTime = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
|
||||||
|
{
|
||||||
|
nr_Connected_mem.Builder builder = nr_Connected_mem.newBuilder();
|
||||||
|
NetMsg send = NetMsg.newMessage();
|
||||||
|
send.setMsgID(EMSGID.nr_connected_mem);
|
||||||
|
send.setMessageLite(builder);
|
||||||
|
|
||||||
|
e.getChannel().write(send);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
|
||||||
|
{
|
||||||
|
if (!(e.getMessage() instanceof NetMsg))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
webSession.getInstance().addSession(e);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// NetMsg msg = (NetMsg)e.getMessage();
|
||||||
|
// if (msg.getMsgID() == EMSGID.nr_read_res || msg.getMsgID() == EMSGID.nr_write_res)
|
||||||
|
// {
|
||||||
|
// method();
|
||||||
|
// }
|
||||||
|
// else
|
||||||
|
// {
|
||||||
|
// webSession.getInstance().addSession(e);
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized static void method()
|
||||||
|
{
|
||||||
|
if (ticks == 0)
|
||||||
|
{
|
||||||
|
diffTime = System.nanoTime();
|
||||||
|
}
|
||||||
|
ticks++;
|
||||||
|
if (ticks == 5000)
|
||||||
|
{
|
||||||
|
System.out.println((System.nanoTime()-diffTime)/1000000.0);
|
||||||
|
ticks = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
|
||||||
|
{
|
||||||
|
if (e.getChannel().getLocalAddress() == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
webSession.getInstance().removeClientChannel(e.getChannel());
|
||||||
|
e.getChannel().close();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,22 @@
|
||||||
|
package r_memcached;
|
||||||
|
|
||||||
|
import org.jboss.netty.channel.ChannelPipeline;
|
||||||
|
import org.jboss.netty.channel.ChannelPipelineFactory;
|
||||||
|
import org.jboss.netty.channel.Channels;
|
||||||
|
|
||||||
|
import server.MDecoder;
|
||||||
|
import server.MEncoder;
|
||||||
|
|
||||||
|
public class MClientPipelineFactory implements ChannelPipelineFactory
|
||||||
|
{
|
||||||
|
|
||||||
|
public ChannelPipeline getPipeline() throws Exception
|
||||||
|
{
|
||||||
|
ChannelPipeline pipeline = Channels.pipeline();
|
||||||
|
|
||||||
|
pipeline.addLast("decoder", new MDecoder());
|
||||||
|
pipeline.addLast("encoder", new MEncoder());
|
||||||
|
pipeline.addLast("handler", new MClientHandler());
|
||||||
|
return pipeline;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,53 @@
|
||||||
|
package r_memcached;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
|
||||||
|
import server.ClientConfig;
|
||||||
|
|
||||||
|
public class MemcachedMgr
|
||||||
|
{
|
||||||
|
public HashMap<Integer, Client> m_mapLocalClients;
|
||||||
|
static MemcachedMgr memcachedMgr;
|
||||||
|
public static int nCopyNode;
|
||||||
|
|
||||||
|
public static MemcachedMgr getInstance()
|
||||||
|
{
|
||||||
|
if (memcachedMgr == null)
|
||||||
|
{
|
||||||
|
memcachedMgr = new MemcachedMgr();
|
||||||
|
}
|
||||||
|
return memcachedMgr;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getSize()
|
||||||
|
{
|
||||||
|
return m_mapLocalClients.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
public void init(HashMap<Integer, ClientConfig> hm)
|
||||||
|
{
|
||||||
|
m_mapLocalClients = new HashMap<Integer, Client>();
|
||||||
|
|
||||||
|
Iterator iter = hm.entrySet().iterator();
|
||||||
|
while (iter.hasNext())
|
||||||
|
{
|
||||||
|
Entry entry = (Entry) iter.next();
|
||||||
|
ClientConfig cc = (ClientConfig)entry.getValue();
|
||||||
|
|
||||||
|
Client lc = new Client();
|
||||||
|
lc.host = cc.host;
|
||||||
|
lc.port = cc.client_port;
|
||||||
|
lc.id = cc.id;
|
||||||
|
m_mapLocalClients.put(lc.id, lc);
|
||||||
|
|
||||||
|
if(lc.init(lc.host, lc.port))
|
||||||
|
{
|
||||||
|
System.out.println("client connected successful");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,9 @@
|
||||||
|
package server;
|
||||||
|
|
||||||
|
public class ClientConfig
|
||||||
|
{
|
||||||
|
public int id;
|
||||||
|
public String host;
|
||||||
|
public int client_port;
|
||||||
|
public String memcached;
|
||||||
|
}
|
|
@ -0,0 +1,18 @@
|
||||||
|
package server;
|
||||||
|
|
||||||
|
public class LockKey {
|
||||||
|
public Integer memNumber = 0;
|
||||||
|
public Integer ncount = 0;
|
||||||
|
public Integer state = unLock;
|
||||||
|
|
||||||
|
LockKey(Integer num, Integer count, Integer s)
|
||||||
|
{
|
||||||
|
memNumber = num;
|
||||||
|
ncount = count;
|
||||||
|
state = s;
|
||||||
|
}
|
||||||
|
|
||||||
|
public final static Integer unLock = 0;
|
||||||
|
public final static Integer badLock =1;
|
||||||
|
public final static Integer waitLock =2;
|
||||||
|
}
|
|
@ -0,0 +1,87 @@
|
||||||
|
package server;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.logging.FileHandler;
|
||||||
|
import java.util.logging.Level;
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
import java.util.logging.SimpleFormatter;
|
||||||
|
|
||||||
|
public class LoggerUtil
|
||||||
|
{
|
||||||
|
/** 存放的文件夹 **/
|
||||||
|
private static String file_name = "webServer";
|
||||||
|
static Logger logger;
|
||||||
|
/**
|
||||||
|
* 得到要记录的日志的路径及文件名称
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private static String getLogName()
|
||||||
|
{
|
||||||
|
StringBuffer logPath = new StringBuffer();
|
||||||
|
logPath.append(System.getProperty("user.dir"));
|
||||||
|
logPath.append("\\"+file_name);
|
||||||
|
File file = new File(logPath.toString());
|
||||||
|
if (!file.exists())
|
||||||
|
file.mkdir();
|
||||||
|
|
||||||
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
|
||||||
|
logPath.append("\\"+sdf.format(new Date())+".log");
|
||||||
|
|
||||||
|
return logPath.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 配置Logger对象输出日志文件路径
|
||||||
|
* @param logger
|
||||||
|
* @throws SecurityException
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static void setLogingProperties(Logger logger) throws SecurityException, IOException {
|
||||||
|
setLogingProperties(logger,Level.ALL);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 配置Logger对象输出日志文件路径
|
||||||
|
* @param logger
|
||||||
|
* @param level 在日志文件中输出level级别以上的信息
|
||||||
|
* @throws SecurityException
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static void setLogingProperties(Logger logger,Level level)
|
||||||
|
{
|
||||||
|
FileHandler fh;
|
||||||
|
try {
|
||||||
|
fh = new FileHandler(getLogName(),true);
|
||||||
|
logger.addHandler(fh);//日志输出文件
|
||||||
|
//logger.setLevel(level);
|
||||||
|
fh.setFormatter(new SimpleFormatter());//输出格式
|
||||||
|
//logger.addHandler(new ConsoleHandler());//输出到控制台
|
||||||
|
} catch (SecurityException e) {
|
||||||
|
logger.log(Level.SEVERE, "安全性错误", e);
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.log(Level.SEVERE,"读取文件日志错误", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static Logger getInstance()
|
||||||
|
{
|
||||||
|
if (logger == null)
|
||||||
|
{
|
||||||
|
logger = Logger.getLogger("webserver");
|
||||||
|
try {
|
||||||
|
LoggerUtil.setLogingProperties(logger);
|
||||||
|
} catch (SecurityException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (IOException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return logger;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,35 @@
|
||||||
|
package server;
|
||||||
|
|
||||||
|
import org.jboss.netty.buffer.ChannelBuffer;
|
||||||
|
import org.jboss.netty.channel.Channel;
|
||||||
|
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||||
|
import org.jboss.netty.handler.codec.frame.FrameDecoder;
|
||||||
|
|
||||||
|
import server.NetMsg;
|
||||||
|
|
||||||
|
import common.EMSGID;
|
||||||
|
|
||||||
|
public class MDecoder extends FrameDecoder
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
|
||||||
|
if (buffer.readableBytes() < 4)
|
||||||
|
{
|
||||||
|
return null;//(1)
|
||||||
|
}
|
||||||
|
int dataLength = buffer.getInt(buffer.readerIndex());
|
||||||
|
if (buffer.readableBytes() < dataLength + 4)
|
||||||
|
{
|
||||||
|
return null;//(2)
|
||||||
|
}
|
||||||
|
|
||||||
|
buffer.skipBytes(4);//(3)
|
||||||
|
int id = buffer.readInt();
|
||||||
|
byte[] decoded = new byte[dataLength-4];
|
||||||
|
|
||||||
|
buffer.readBytes(decoded);
|
||||||
|
NetMsg msg = new NetMsg(decoded, id);//(4)
|
||||||
|
msg.setMsgID(EMSGID.values()[id]);
|
||||||
|
return msg;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
package server;
|
||||||
|
|
||||||
|
import org.jboss.netty.buffer.ChannelBuffer;
|
||||||
|
import org.jboss.netty.buffer.ChannelBuffers;
|
||||||
|
import org.jboss.netty.channel.Channel;
|
||||||
|
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||||
|
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
|
||||||
|
|
||||||
|
import server.NetMsg;
|
||||||
|
|
||||||
|
public class MEncoder extends OneToOneEncoder
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception
|
||||||
|
{
|
||||||
|
if (!(msg instanceof NetMsg))
|
||||||
|
{
|
||||||
|
return msg;//(1)
|
||||||
|
}
|
||||||
|
NetMsg res = (NetMsg)msg;
|
||||||
|
byte[] data = res.getBytes();
|
||||||
|
int dataLength = data.length+4;
|
||||||
|
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();//(2)
|
||||||
|
buf.writeInt(dataLength);
|
||||||
|
buf.writeInt(res.msgID.ordinal());
|
||||||
|
buf.writeBytes(data);
|
||||||
|
return buf;//(3)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,55 @@
|
||||||
|
package server;
|
||||||
|
|
||||||
|
|
||||||
|
import messageBody.requestMsg.nr_Read_res;
|
||||||
|
import messageBody.requestMsg.nr_write_res;
|
||||||
|
|
||||||
|
import org.jboss.netty.channel.Channel;
|
||||||
|
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||||
|
import org.jboss.netty.channel.ChannelStateEvent;
|
||||||
|
import org.jboss.netty.channel.ExceptionEvent;
|
||||||
|
import org.jboss.netty.channel.MessageEvent;
|
||||||
|
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||||
|
|
||||||
|
import server.NetMsg;
|
||||||
|
|
||||||
|
import common.EMSGID;
|
||||||
|
|
||||||
|
|
||||||
|
public class MServerHandler extends SimpleChannelUpstreamHandler
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
|
||||||
|
{
|
||||||
|
if (!(e.getMessage() instanceof NetMsg))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
webSession.getInstance().addSession(e);
|
||||||
|
|
||||||
|
// NetMsg msg = (NetMsg)e.getMessage();
|
||||||
|
// if (msg.getMsgID() == EMSGID.nr_read_res)
|
||||||
|
// {
|
||||||
|
// nr_Read_res msgLite = msg.getMessageLite();
|
||||||
|
// System.out.println(System.nanoTime()-msgLite.getTime());
|
||||||
|
// }
|
||||||
|
// else if (msg.getMsgID() == EMSGID.nr_write_res){
|
||||||
|
// nr_write_res msgLite = msg.getMessageLite();
|
||||||
|
// System.out.println(System.nanoTime()-msgLite.getTime());
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
|
||||||
|
{
|
||||||
|
Channel channel = e.getChannel();
|
||||||
|
webSession.getInstance().removeRequestChannel(channel);
|
||||||
|
channel.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)throws Exception
|
||||||
|
{
|
||||||
|
webSession.getInstance().addRequestChannel(e.getChannel());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,22 @@
|
||||||
|
package server;
|
||||||
|
|
||||||
|
import org.jboss.netty.channel.ChannelPipeline;
|
||||||
|
import org.jboss.netty.channel.ChannelPipelineFactory;
|
||||||
|
import org.jboss.netty.channel.Channels;
|
||||||
|
|
||||||
|
import server.MDecoder;
|
||||||
|
import server.MEncoder;
|
||||||
|
import server.MServerHandler;
|
||||||
|
|
||||||
|
public class MServerPipelineFactory implements ChannelPipelineFactory
|
||||||
|
{
|
||||||
|
public ChannelPipeline getPipeline() throws Exception
|
||||||
|
{
|
||||||
|
ChannelPipeline pipeline = Channels.pipeline();
|
||||||
|
|
||||||
|
pipeline.addLast("decoder", new MDecoder());
|
||||||
|
pipeline.addLast("encoder", new MEncoder());
|
||||||
|
pipeline.addLast("handler", new MServerHandler());
|
||||||
|
return pipeline;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,53 @@
|
||||||
|
package server;
|
||||||
|
import com.google.protobuf.GeneratedMessage;
|
||||||
|
import com.google.protobuf.MessageLite;
|
||||||
|
import server.NetMsg;
|
||||||
|
|
||||||
|
import common.EMSGID;
|
||||||
|
import common.MessageManager;
|
||||||
|
|
||||||
|
public class NetMsg
|
||||||
|
{
|
||||||
|
EMSGID msgID;
|
||||||
|
MessageLite messageLite;
|
||||||
|
|
||||||
|
private NetMsg(){};
|
||||||
|
public static NetMsg newMessage()
|
||||||
|
{
|
||||||
|
return new NetMsg();
|
||||||
|
}
|
||||||
|
|
||||||
|
NetMsg(byte[] decoded, int id) throws Exception
|
||||||
|
{
|
||||||
|
messageLite = MessageManager.getMessage(id, decoded);
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] getBytes()
|
||||||
|
{
|
||||||
|
return messageLite.toByteArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
public EMSGID getMsgID()
|
||||||
|
{
|
||||||
|
return msgID;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMsgID(EMSGID id) {
|
||||||
|
this.msgID = id;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public <T extends MessageLite> T getMessageLite()
|
||||||
|
{
|
||||||
|
return (T)messageLite;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Ḭ̈߳²È«µÄ
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
public void setMessageLite(GeneratedMessage.Builder builder)
|
||||||
|
{
|
||||||
|
this.messageLite = builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,46 @@
|
||||||
|
package server;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
|
import org.jboss.netty.bootstrap.ServerBootstrap;
|
||||||
|
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
|
||||||
|
|
||||||
|
import server.MServerPipelineFactory;
|
||||||
|
import server.Server;
|
||||||
|
|
||||||
|
public class Server
|
||||||
|
{
|
||||||
|
static Server server;
|
||||||
|
ServerBootstrap bootstrap;
|
||||||
|
|
||||||
|
public void init(int port)
|
||||||
|
{
|
||||||
|
bootstrap = new ServerBootstrap(
|
||||||
|
new NioServerSocketChannelFactory(
|
||||||
|
Executors.newCachedThreadPool(),
|
||||||
|
Executors.newCachedThreadPool()));
|
||||||
|
|
||||||
|
// Set up the default event pipeline.
|
||||||
|
bootstrap.setPipelineFactory(new MServerPipelineFactory());
|
||||||
|
bootstrap.setOption("child.tcpNoDelay", true);
|
||||||
|
bootstrap.setOption("child.keepAlive", true);
|
||||||
|
bootstrap.setOption("reuseAddress", true);
|
||||||
|
// Bind and start to accept incoming connections.
|
||||||
|
bootstrap.bind(new InetSocketAddress(port));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop()
|
||||||
|
{
|
||||||
|
bootstrap.releaseExternalResources();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Server getInstance()
|
||||||
|
{
|
||||||
|
if (server == null)
|
||||||
|
{
|
||||||
|
server = new Server();
|
||||||
|
}
|
||||||
|
return server;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,216 @@
|
||||||
|
package server;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import javax.xml.parsers.DocumentBuilder;
|
||||||
|
import javax.xml.parsers.DocumentBuilderFactory;
|
||||||
|
import javax.xml.parsers.ParserConfigurationException;
|
||||||
|
|
||||||
|
import org.apache.log4j.PropertyConfigurator;
|
||||||
|
import org.w3c.dom.Document;
|
||||||
|
import org.w3c.dom.Element;
|
||||||
|
import org.w3c.dom.Node;
|
||||||
|
import org.w3c.dom.NodeList;
|
||||||
|
import org.xml.sax.SAXException;
|
||||||
|
|
||||||
|
import r_memcached.MemcachedMgr;
|
||||||
|
import server.ClientConfig;
|
||||||
|
import server.Server;
|
||||||
|
import common.RegisterHandler;
|
||||||
|
public class WebServerMain
|
||||||
|
{
|
||||||
|
static HashMap<Integer, ClientConfig> m_mapMemcachedClient;
|
||||||
|
public static boolean initConfig()
|
||||||
|
{
|
||||||
|
m_mapMemcachedClient = new HashMap<Integer, ClientConfig>();
|
||||||
|
File f = new File(System.getProperty("user.dir"));
|
||||||
|
String path = f.getPath() + File.separator + "bin" + File.separator;
|
||||||
|
readClientsXML(path+"client.xml");
|
||||||
|
try {
|
||||||
|
Properties properties = new Properties();
|
||||||
|
properties.load(new FileInputStream(path+"config.properties"));
|
||||||
|
MemcachedMgr.nCopyNode = Integer.parseInt(properties.getProperty("replicasNum"));
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (IOException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class bench extends Thread
|
||||||
|
{
|
||||||
|
private int runs;
|
||||||
|
private int threadNum;
|
||||||
|
private String object;
|
||||||
|
private String[] keys;
|
||||||
|
private int size;
|
||||||
|
private int nums;
|
||||||
|
private double rate;
|
||||||
|
|
||||||
|
public bench(int runs,int nums, int threadNum, String object, String[] keys, double rate)
|
||||||
|
{
|
||||||
|
this.runs = runs;
|
||||||
|
this.threadNum = threadNum;
|
||||||
|
this.object = object;
|
||||||
|
this.keys = keys;
|
||||||
|
this.size = object.length();
|
||||||
|
this.nums = nums;
|
||||||
|
this.rate = rate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Thread.sleep(10);
|
||||||
|
} catch (InterruptedException e)
|
||||||
|
{
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
// time deletes
|
||||||
|
long time = 0;
|
||||||
|
time = System.nanoTime();
|
||||||
|
randReadWrite(rate);
|
||||||
|
time = System.nanoTime() - time;
|
||||||
|
System.out.println(time/1000000000.0f);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void randReadWrite(double scale)
|
||||||
|
{
|
||||||
|
Random randNum = new Random();
|
||||||
|
for (int i = 0; i < runs; i++)
|
||||||
|
{
|
||||||
|
if (Math.random()<scale)
|
||||||
|
{
|
||||||
|
webSession.getInstance().get(keys[randNum.nextInt(nums)]);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
webSession.getInstance().set(keys[randNum.nextInt(nums)], object);
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Thread.sleep((long) 0.00001);
|
||||||
|
} catch (InterruptedException e)
|
||||||
|
{
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args)
|
||||||
|
{
|
||||||
|
PropertyConfigurator.configure(System.getProperty("user.dir")+"/bin/log4j.properties");//加载.properties文件
|
||||||
|
initConfig();
|
||||||
|
|
||||||
|
RegisterHandler.initHandler();
|
||||||
|
webSession.getInstance().start();
|
||||||
|
|
||||||
|
// client管理
|
||||||
|
MemcachedMgr clientMgr = MemcachedMgr.getInstance();
|
||||||
|
clientMgr.init(m_mapMemcachedClient);
|
||||||
|
|
||||||
|
Server requestServer = Server.getInstance();
|
||||||
|
requestServer.init(8888);
|
||||||
|
|
||||||
|
int threads = Integer.parseInt(args[0]);//线程数
|
||||||
|
int runs = Integer.parseInt(args[1]); //执行次数
|
||||||
|
int Nums = Integer.parseInt(args[2]); // key数目
|
||||||
|
int size = Integer.parseInt(args[3]); // value大小
|
||||||
|
double rate = Double.parseDouble(args[4]); //读写比例
|
||||||
|
|
||||||
|
// get object to store
|
||||||
|
byte[] obj = new byte[size];
|
||||||
|
for (int i = 0; i < size; i++)
|
||||||
|
{
|
||||||
|
obj[i] = '1';
|
||||||
|
}
|
||||||
|
String value = new String(obj);
|
||||||
|
|
||||||
|
String[] keys = new String[Nums];
|
||||||
|
for (int i = 0; i < Nums; i++)
|
||||||
|
{
|
||||||
|
keys[i] = "" + i;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < threads; i++)
|
||||||
|
{
|
||||||
|
bench b = new bench(runs, Nums, i, value, keys, rate);
|
||||||
|
b.start();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 读取memcached client配置
|
||||||
|
public static boolean readClientsXML(String str)
|
||||||
|
{
|
||||||
|
DocumentBuilderFactory factory=DocumentBuilderFactory.newInstance();
|
||||||
|
try {
|
||||||
|
factory.setIgnoringElementContentWhitespace(true);
|
||||||
|
|
||||||
|
DocumentBuilder db=factory.newDocumentBuilder();
|
||||||
|
Document xmldoc=db.parse(new File(str));
|
||||||
|
Element elmtInfo = xmldoc.getDocumentElement();
|
||||||
|
NodeList nodes = elmtInfo.getChildNodes();
|
||||||
|
for (int i = 0; i < nodes.getLength(); i++)
|
||||||
|
{
|
||||||
|
Node result = nodes.item(i);
|
||||||
|
if (result.getNodeType() == Node.ELEMENT_NODE && result.getNodeName().equals("client"))
|
||||||
|
{
|
||||||
|
NodeList ns = result.getChildNodes();
|
||||||
|
ClientConfig localClient = new ClientConfig();
|
||||||
|
int m=0;
|
||||||
|
for (int j = 0; j < ns.getLength(); j++)
|
||||||
|
{
|
||||||
|
Node record = ns.item(j);
|
||||||
|
if (record.getNodeType() == Node.ELEMENT_NODE)
|
||||||
|
{
|
||||||
|
if (record.getNodeName().equals("id"))
|
||||||
|
{
|
||||||
|
m++;
|
||||||
|
localClient.id = Integer.decode(record.getTextContent());
|
||||||
|
}
|
||||||
|
else if (record.getNodeName().equals("host"))
|
||||||
|
{
|
||||||
|
m++;
|
||||||
|
localClient.host = record.getTextContent();
|
||||||
|
}
|
||||||
|
else if (record.getNodeName().equals("client_port"))
|
||||||
|
{
|
||||||
|
m++;
|
||||||
|
localClient.client_port = Integer.decode(record.getTextContent());
|
||||||
|
}
|
||||||
|
else if (record.getNodeName().equals("memcached"))
|
||||||
|
{
|
||||||
|
m++;
|
||||||
|
localClient.memcached = record.getTextContent();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(m==4)
|
||||||
|
{
|
||||||
|
m_mapMemcachedClient.put(localClient.id, localClient);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (ParserConfigurationException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (SAXException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,333 @@
|
||||||
|
package server;
|
||||||
|
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
|
||||||
|
import messageBody.memcachedmsg.nm_Connected;
|
||||||
|
import messageBody.memcachedmsg.nm_Connected_web_back;
|
||||||
|
import messageBody.requestMsg.nr_Connected_mem_back;
|
||||||
|
import messageBody.requestMsg.nr_Read;
|
||||||
|
import messageBody.requestMsg.nr_Read_res;
|
||||||
|
import messageBody.requestMsg.nr_write;
|
||||||
|
import messageBody.requestMsg.nr_write_res;
|
||||||
|
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.apache.log4j.Priority;
|
||||||
|
import org.jboss.netty.channel.Channel;
|
||||||
|
import org.jboss.netty.channel.MessageEvent;
|
||||||
|
import org.jboss.netty.util.internal.ConcurrentHashMap;
|
||||||
|
|
||||||
|
import database.DBMessage;
|
||||||
|
import r_memcached.MemcachedMgr;
|
||||||
|
import server.NetMsg;
|
||||||
|
import common.EMSGID;
|
||||||
|
|
||||||
|
public class webSession implements Runnable
|
||||||
|
{
|
||||||
|
ConcurrentLinkedQueue<MessageEvent> recvQueue = new ConcurrentLinkedQueue<MessageEvent>();
|
||||||
|
Map<Integer, Channel> MemcachedChannelMap = new ConcurrentHashMap<Integer, Channel>();
|
||||||
|
Map<Integer, Channel> RequestChannelMap = new ConcurrentHashMap<Integer, Channel>();
|
||||||
|
static webSession session = null;
|
||||||
|
public Channel curChannel;
|
||||||
|
public long totalTime = 0;
|
||||||
|
public long ticks =0;
|
||||||
|
|
||||||
|
//public static Logger log = LoggerUtil.getInstance();
|
||||||
|
public static Logger log = Logger.getLogger(webSession.class.getName());
|
||||||
|
|
||||||
|
public static webSession getInstance()
|
||||||
|
{
|
||||||
|
if (session == null)
|
||||||
|
{
|
||||||
|
session = new webSession();
|
||||||
|
}
|
||||||
|
return session;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void start()
|
||||||
|
{
|
||||||
|
//DBSession.getInstance().start(); 数据库连接
|
||||||
|
new Thread(session).start();
|
||||||
|
System.out.println("session start");
|
||||||
|
}
|
||||||
|
|
||||||
|
// 添加request连接
|
||||||
|
public void addRequestChannel(Channel ch)
|
||||||
|
{
|
||||||
|
RequestChannelMap.put(ch.getId(), ch);
|
||||||
|
}
|
||||||
|
// 删除request连接
|
||||||
|
public void removeRequestChannel(Channel ch)
|
||||||
|
{
|
||||||
|
RequestChannelMap.remove(ch.getId());
|
||||||
|
}
|
||||||
|
// 获得request连接
|
||||||
|
public Channel getRequestChannel(Integer id)
|
||||||
|
{
|
||||||
|
return RequestChannelMap.get(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 增加client连接
|
||||||
|
public void addClientChannel(Integer num,Channel ch)
|
||||||
|
{
|
||||||
|
MemcachedChannelMap.put(num, ch);
|
||||||
|
}
|
||||||
|
public Channel getClientChannel(Integer id)
|
||||||
|
{
|
||||||
|
return MemcachedChannelMap.get(id);
|
||||||
|
}
|
||||||
|
// 删掉client连接
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
public void removeClientChannel(Channel ch)
|
||||||
|
{
|
||||||
|
Iterator iter = MemcachedChannelMap.entrySet().iterator();
|
||||||
|
while (iter.hasNext())
|
||||||
|
{
|
||||||
|
Entry entry = (Entry) iter.next();
|
||||||
|
if ((Channel)entry.getValue() == ch)
|
||||||
|
{
|
||||||
|
MemcachedChannelMap.remove((Integer)entry.getKey());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//////////////////////////////////////////////////////////
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
while(true)
|
||||||
|
{
|
||||||
|
MessageEvent event = recvQueue.poll();
|
||||||
|
while(event != null)
|
||||||
|
{
|
||||||
|
handle(event);
|
||||||
|
event = recvQueue.poll();
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Thread.sleep(0);
|
||||||
|
} catch (InterruptedException e)
|
||||||
|
{
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public int gethashMem(String key)
|
||||||
|
{
|
||||||
|
return Math.abs(key.hashCode()%MemcachedMgr.getInstance().getSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void handle(MessageEvent e)
|
||||||
|
{
|
||||||
|
NetMsg msg = (NetMsg)e.getMessage();
|
||||||
|
switch (msg.getMsgID())
|
||||||
|
{
|
||||||
|
case nm_connected:
|
||||||
|
{
|
||||||
|
nm_Connected msgBody = msg.getMessageLite();
|
||||||
|
addClientChannel(msgBody.getNum(), e.getChannel());
|
||||||
|
|
||||||
|
nm_Connected_web_back.Builder builder = nm_Connected_web_back.newBuilder();
|
||||||
|
|
||||||
|
NetMsg send = NetMsg.newMessage();
|
||||||
|
send.setMessageLite(builder);
|
||||||
|
send.setMsgID(EMSGID.nm_connected_web_back);
|
||||||
|
|
||||||
|
sendMsg(e.getChannel(), send);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case nr_connected_mem_back:
|
||||||
|
{
|
||||||
|
nr_Connected_mem_back msgLite= msg.getMessageLite();
|
||||||
|
addClientChannel(msgLite.getMemID(), e.getChannel());
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case nr_read_res:
|
||||||
|
{
|
||||||
|
nr_Read_res msgBody = msg.getMessageLite();
|
||||||
|
// System.out.println("key:"+msgBody.getKey()+" value:"+msgBody.getValue());
|
||||||
|
|
||||||
|
// System.out.println(String.valueOf((System.nanoTime()-msgBody.getTime())/1000000.0));
|
||||||
|
// log.log(Priority.INFO, String.valueOf((System.nanoTime()-msgBody.getTime())/1000000.0));
|
||||||
|
// log.log(Level.INFO, String.valueOf((System.nanoTime()-msgBody.getTime())/1000000.0));
|
||||||
|
// System.err.println((System.nanoTime()-msgBody.getTime())/1000000.0);
|
||||||
|
// if (ticks==0)
|
||||||
|
// {
|
||||||
|
// totalTime = System.currentTimeMillis();
|
||||||
|
// }
|
||||||
|
|
||||||
|
// totalTime += System.nanoTime()-msgBody.getTime();
|
||||||
|
// ticks++;
|
||||||
|
// if (ticks == 1000)
|
||||||
|
// {
|
||||||
|
// System.out.println(totalTime/1000000000.0f);
|
||||||
|
//
|
||||||
|
// totalTime = 0;
|
||||||
|
// ticks = 0;
|
||||||
|
// }
|
||||||
|
// if (msgBody.getValue().isEmpty()) //读数据库
|
||||||
|
// {
|
||||||
|
// DBMessage dbMsg=new DBMessage();
|
||||||
|
// dbMsg.mode = DBMessage.mode_query;
|
||||||
|
// dbMsg.key = msgBody.getKey();
|
||||||
|
//
|
||||||
|
// DBSession.getInstance().addDBMessage(dbMsg);
|
||||||
|
// }
|
||||||
|
// else
|
||||||
|
// {
|
||||||
|
// nc_ReadRes.Builder builder = nc_ReadRes.newBuilder();
|
||||||
|
// builder.setKey(msgBody.getKey());
|
||||||
|
// builder.setValue(msgBody.getValue());
|
||||||
|
//
|
||||||
|
// NetMsg send = NetMsg.newMessage();
|
||||||
|
// send.setMessageLite(builder);
|
||||||
|
// send.setMsgID(EMSGID.nc_read_res);
|
||||||
|
//
|
||||||
|
// //sendMsg(getRequestChannel(msgBody.getClientid()), send);
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case nr_write_res:
|
||||||
|
{
|
||||||
|
nr_write_res msgBody = msg.getMessageLite();
|
||||||
|
// System.out.println("key:"+msgBody.getKey()+" value:"+msgBody.getValue());
|
||||||
|
|
||||||
|
//DBMessage dbMsg = new DBMessage(); //异步写数据库
|
||||||
|
//dbMsg.mode = DBMessage.mode_set;
|
||||||
|
//dbMsg.key = msgBody.getKey();
|
||||||
|
//dbMsg.value = msgBody.getValue();
|
||||||
|
//DBSession.getInstance().addDBMessage(dbMsg);
|
||||||
|
|
||||||
|
|
||||||
|
// log.log(Level.INFO, String.valueOf((System.nanoTime()-msgBody.getTime())/1000000.0));
|
||||||
|
//System.err.println();
|
||||||
|
|
||||||
|
// System.out.println(String.valueOf((System.nanoTime()-msgBody.getTime())/1000000.0));
|
||||||
|
//
|
||||||
|
// if (ticks==0)
|
||||||
|
// {
|
||||||
|
// totalTime = System.currentTimeMillis();
|
||||||
|
// }
|
||||||
|
|
||||||
|
// totalTime += System.nanoTime()-msgBody.getTime();
|
||||||
|
// ticks++;
|
||||||
|
// if (ticks == 1000)
|
||||||
|
// {
|
||||||
|
// System.out.println(totalTime/1000000000.0f);
|
||||||
|
//
|
||||||
|
// totalTime = 0;
|
||||||
|
// ticks = 0;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// nc_WriteRes.Builder builder = nc_WriteRes.newBuilder();
|
||||||
|
// builder.setKey(msgBody.getKey());
|
||||||
|
// builder.setValue(msgBody.getValue());
|
||||||
|
//
|
||||||
|
// NetMsg send = NetMsg.newMessage();
|
||||||
|
// send.setMessageLite(builder);
|
||||||
|
// send.setMsgID(EMSGID.nc_write_res);
|
||||||
|
|
||||||
|
//sendMsg(getRequestChannel(msgBody.getClientid()), send);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
System.err.println(msg.getMsgID().toString());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
//log.log(Level.INFO, msg.getMsgID().toString());
|
||||||
|
}
|
||||||
|
public void addSession(MessageEvent e)
|
||||||
|
{
|
||||||
|
recvQueue.offer(e);
|
||||||
|
}
|
||||||
|
public boolean sendAllMsg(Integer hash, NetMsg msg)
|
||||||
|
{
|
||||||
|
for (int i = 0; i < MemcachedMgr.nCopyNode; i++)
|
||||||
|
{
|
||||||
|
Channel eChannel = getClientChannel(hash+i);
|
||||||
|
if (eChannel != null)
|
||||||
|
{
|
||||||
|
sendMsg(eChannel, msg);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean randSendMsg2Memcached(Integer hash, NetMsg msg)
|
||||||
|
{
|
||||||
|
Random random = new Random();
|
||||||
|
int index = random.nextInt(MemcachedMgr.nCopyNode);
|
||||||
|
for (int i = 0; i < MemcachedMgr.nCopyNode; i++)
|
||||||
|
{
|
||||||
|
int num = (hash+i+index+MemcachedMgr.getInstance().getSize())%MemcachedMgr.getInstance().getSize();
|
||||||
|
Channel eChannel = getClientChannel(num);
|
||||||
|
if (eChannel != null)
|
||||||
|
{
|
||||||
|
sendMsg(eChannel, msg);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean SendMsg2Leader(Integer hash, NetMsg msg)
|
||||||
|
{
|
||||||
|
for (int i = 0; i < MemcachedMgr.nCopyNode; i++)
|
||||||
|
{
|
||||||
|
int index = (hash+i+MemcachedMgr.getInstance().getSize())
|
||||||
|
%MemcachedMgr.getInstance().getSize();
|
||||||
|
Channel eChannel = getClientChannel(index);
|
||||||
|
if (eChannel != null)
|
||||||
|
{
|
||||||
|
sendMsg(eChannel, msg);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void sendMsg(Channel ch, NetMsg msg)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
ch.write(msg);
|
||||||
|
}
|
||||||
|
catch (Throwable e)
|
||||||
|
{
|
||||||
|
//log.log(Level.WARNING, "send msg fail");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public boolean get(String key)
|
||||||
|
{
|
||||||
|
nr_Read.Builder builder = nr_Read.newBuilder();
|
||||||
|
builder.setKey(key);
|
||||||
|
builder.setTime(System.nanoTime());
|
||||||
|
NetMsg msg = NetMsg.newMessage();
|
||||||
|
msg.setMessageLite(builder);
|
||||||
|
msg.setMsgID(EMSGID.nr_read);
|
||||||
|
randSendMsg2Memcached(key.hashCode(), msg);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean set(String key, String value)
|
||||||
|
{
|
||||||
|
nr_write.Builder builder = nr_write.newBuilder();
|
||||||
|
builder.setKey(key);
|
||||||
|
builder.setValue(value);
|
||||||
|
builder.setTime(System.nanoTime());
|
||||||
|
NetMsg msg = NetMsg.newMessage();
|
||||||
|
msg.setMessageLite(builder);
|
||||||
|
msg.setMsgID(EMSGID.nr_write);
|
||||||
|
SendMsg2Leader(key.hashCode(), msg);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue