From 43b0aa58477afa96749789d0a7d63524c815f4d7 Mon Sep 17 00:00:00 2001 From: lyr90329 Date: Wed, 13 Aug 2014 17:51:21 +0800 Subject: [PATCH] fix bug --- R-Memcached/src/server/MDecoder.java | 48 +++++++++++++------------- R-Memcached/src/server/MEncoder.java | 5 +-- R-Memcached/src/server/NetMsg.java | 24 +++++++++---- R-Memcached/src/server/memSession.java | 21 +++++++++-- 4 files changed, 62 insertions(+), 36 deletions(-) diff --git a/R-Memcached/src/server/MDecoder.java b/R-Memcached/src/server/MDecoder.java index c03c09a..5ab1448 100644 --- a/R-Memcached/src/server/MDecoder.java +++ b/R-Memcached/src/server/MDecoder.java @@ -1,4 +1,4 @@ -package server; +package com.myself.server; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; @@ -7,27 +7,27 @@ import org.jboss.netty.handler.codec.frame.FrameDecoder; import common.EMSGID; -public class MDecoder extends FrameDecoder -{ - @Override - protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { - if (buffer.readableBytes() < 4) - { - return null;//(1) - } - int dataLength = buffer.getInt(buffer.readerIndex()); - if (buffer.readableBytes() < dataLength + 4) - { - return null;//(2) - } - - buffer.skipBytes(4);//(3) - int id = buffer.readInt(); - byte[] decoded = new byte[dataLength-4]; - - buffer.readBytes(decoded); - NetMsg msg = new NetMsg(decoded, id);//(4) - msg.setMsgID(EMSGID.values()[id]); - return msg; - } +public class MDecoder extends FrameDecoder { + @Override + protected Object decode(ChannelHandlerContext ctx, Channel channel, + ChannelBuffer buffer) throws Exception { + if (buffer.readableBytes() < 8) { + return null;// (1) + } + int dataLength = buffer.getInt(buffer.readerIndex()); + if (buffer.readableBytes() < dataLength + 4) { + return null;// (2) + } + + buffer.skipBytes(4);// (3) + int id = buffer.readInt(); + int nodeRoute = buffer.readInt(); + byte[] decoded = new byte[dataLength - 8]; + + buffer.readBytes(decoded); + NetMsg msg = new NetMsg(decoded, id);// (4) + msg.setMsgID(EMSGID.values()[id]); + msg.setNodeRoute(nodeRoute); + return msg; + } } \ No newline at end of file diff --git a/R-Memcached/src/server/MEncoder.java b/R-Memcached/src/server/MEncoder.java index 3d75d1b..af3d2ec 100644 --- a/R-Memcached/src/server/MEncoder.java +++ b/R-Memcached/src/server/MEncoder.java @@ -1,4 +1,4 @@ -package server; +package com.myself.server; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; @@ -17,10 +17,11 @@ public class MEncoder extends OneToOneEncoder } NetMsg res = (NetMsg)msg; byte[] data = res.getBytes(); - int dataLength = data.length+4; + int dataLength = data.length+8; ChannelBuffer buf = ChannelBuffers.dynamicBuffer();//(2) buf.writeInt(dataLength); buf.writeInt(res.msgID.ordinal()); + buf.writeInt(res.getNodeRoute()); buf.writeBytes(data); return buf;//(3) } diff --git a/R-Memcached/src/server/NetMsg.java b/R-Memcached/src/server/NetMsg.java index b363b39..b06b865 100644 --- a/R-Memcached/src/server/NetMsg.java +++ b/R-Memcached/src/server/NetMsg.java @@ -1,19 +1,22 @@ -package server; +package com.myself.server; import com.google.protobuf.GeneratedMessage; import com.google.protobuf.MessageLite; import common.EMSGID; import common.MessageManager; -public class NetMsg // package different messages +public class NetMsg { EMSGID msgID; MessageLite messageLite; + int nodeRoute; private NetMsg(){}; public static NetMsg newMessage() { - return new NetMsg(); + NetMsg msg = new NetMsg(); + msg.setNodeRoute(0); + return msg; } NetMsg(byte[] decoded, int id) throws Exception @@ -21,31 +24,38 @@ public class NetMsg // package different messages messageLite = MessageManager.getMessage(id, decoded); } - public byte[] getBytes() //get data in messageLite + public byte[] getBytes() { return messageLite.toByteArray(); } - public EMSGID getMsgID() //get message catagory + public EMSGID getMsgID() { return msgID; } - public void setMsgID(EMSGID id) { //according EMSGID.java set the message ID + public void setMsgID(EMSGID id) { this.msgID = id; } @SuppressWarnings("unchecked") - public T getMessageLite() //get messageLite + public T getMessageLite() { return (T)messageLite; } + //线程安全的 @SuppressWarnings("rawtypes") public void setMessageLite(GeneratedMessage.Builder builder) { this.messageLite = builder.build(); } + public int getNodeRoute() { + return nodeRoute; + } + public void setNodeRoute(int nodeRoute) { + this.nodeRoute = nodeRoute; + } } diff --git a/R-Memcached/src/server/memSession.java b/R-Memcached/src/server/memSession.java index 41fd94f..53d4d96 100644 --- a/R-Memcached/src/server/memSession.java +++ b/R-Memcached/src/server/memSession.java @@ -23,6 +23,7 @@ import messageBody.requestMsg.nr_write_res; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.util.internal.ConcurrentHashMap; +import org.json.JSONObject; import com.my.memcached.MemcachedClient; import com.myself.client.ClientMgr; @@ -151,6 +152,10 @@ public class memSession implements Runnable { public boolean removeLock(String key) { return LockKeyMap.remove(key) != null; } + + public int getCurrentNode() { + return (int) Math.pow(2, ClientMgr.getInstance().mClientNumber); + } public void handle(MessageEvent e) { NetMsg msg = (NetMsg) e.getMessage(); @@ -160,15 +165,17 @@ public class memSession implements Runnable { nr_Stats msgLite = msg.getMessageLite(); @SuppressWarnings("rawtypes") Map stats = client.stats(); + JSONObject jStats = new JSONObject(stats); if (stats != null) { nr_Stats_res.Builder builder = nr_Stats_res.newBuilder(); builder.setKey(""); - builder.setValue(stats.toString()); + builder.setValue(jStats.toString()); builder.setTime(msgLite.getTime()); NetMsg send = NetMsg.newMessage(); send.setMessageLite(builder); send.setMsgID(EMSGID.nr_stats_res); + send.setNodeRoute(getCurrentNode()); webServeChannel.write(send); return; } @@ -225,7 +232,7 @@ public class memSession implements Runnable { NetMsg send = NetMsg.newMessage(); send.setMessageLite(builder); send.setMsgID(EMSGID.nr_read_res); - + send.setNodeRoute(getCurrentNode()); webServeChannel.write(send); return; } else if (state == LockKey.unLock) { @@ -239,8 +246,8 @@ public class memSession implements Runnable { NetMsg send = NetMsg.newMessage(); send.setMessageLite(builder); send.setMsgID(EMSGID.nr_read_res); + send.setNodeRoute(getCurrentNode()); webServeChannel.write(send); - return; } } @@ -251,6 +258,7 @@ public class memSession implements Runnable { NetMsg send = NetMsg.newMessage(); send.setMessageLite(builder); + send.setNodeRoute(getCurrentNode()); send.setMsgID(EMSGID.nm_read); if (sendOtherCopyMsg(gethashMem(msgLite.getKey()), send) == false) { @@ -281,6 +289,7 @@ public class memSession implements Runnable { NetMsg send = NetMsg.newMessage(); send.setMessageLite(builder); send.setMsgID(EMSGID.nr_read_res); + send.setNodeRoute(getCurrentNode() + msg.getNodeRoute()); webServeChannel.write(send); nm_read_recovery.Builder builder1 = nm_read_recovery.newBuilder(); @@ -302,6 +311,7 @@ public class memSession implements Runnable { NetMsg send = NetMsg.newMessage(); send.setMessageLite(builder); send.setMsgID(EMSGID.nr_read_res); + send.setNodeRoute(getCurrentNode() + msg.getNodeRoute()); webServeChannel.write(send); } @@ -336,6 +346,7 @@ public class memSession implements Runnable { NetMsg send2 = NetMsg.newMessage(); send2.setMessageLite(builder2); send2.setMsgID(EMSGID.nr_write_res); + send2.setNodeRoute(getCurrentNode()); webServeChannel.write(send2); return; } @@ -355,6 +366,7 @@ public class memSession implements Runnable { NetMsg send2 = NetMsg.newMessage(); send2.setMessageLite(builder2); send2.setMsgID(EMSGID.nr_write_res); + send2.setNodeRoute(getCurrentNode()); webServeChannel.write(send2); System.out.println("write lock conflict, please request again."); return; @@ -381,6 +393,7 @@ public class memSession implements Runnable { NetMsg send2 = NetMsg.newMessage(); send2.setMessageLite(builder2); send2.setMsgID(EMSGID.nr_write_res); + send2.setNodeRoute(getCurrentNode()); webServeChannel.write(send2); } @@ -437,6 +450,7 @@ public class memSession implements Runnable { NetMsg send2 = NetMsg.newMessage(); send2.setMessageLite(builder2); send2.setMsgID(EMSGID.nr_write_res); + send2.setNodeRoute(getCurrentNode() + msg.getNodeRoute()); webServeChannel.write(send2); nm_write_2.Builder builder = nm_write_2.newBuilder(); @@ -459,6 +473,7 @@ public class memSession implements Runnable { NetMsg send2 = NetMsg.newMessage(); send2.setMessageLite(builder2); send2.setMsgID(EMSGID.nr_write_res); + send2.setNodeRoute(getCurrentNode() + msg.getNodeRoute()); webServeChannel.write(send2); } }