This commit is contained in:
lyr90329 2014-08-13 17:51:21 +08:00
parent e2bb71d18c
commit 43b0aa5847
4 changed files with 62 additions and 36 deletions

View File

@ -1,4 +1,4 @@
package server;
package com.myself.server;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
@ -7,27 +7,27 @@ import org.jboss.netty.handler.codec.frame.FrameDecoder;
import common.EMSGID;
public class MDecoder extends FrameDecoder
{
public class MDecoder extends FrameDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
if (buffer.readableBytes() < 4)
{
return null;//(1)
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer) throws Exception {
if (buffer.readableBytes() < 8) {
return null;// (1)
}
int dataLength = buffer.getInt(buffer.readerIndex());
if (buffer.readableBytes() < dataLength + 4)
{
return null;//(2)
if (buffer.readableBytes() < dataLength + 4) {
return null;// (2)
}
buffer.skipBytes(4);//(3)
buffer.skipBytes(4);// (3)
int id = buffer.readInt();
byte[] decoded = new byte[dataLength-4];
int nodeRoute = buffer.readInt();
byte[] decoded = new byte[dataLength - 8];
buffer.readBytes(decoded);
NetMsg msg = new NetMsg(decoded, id);//(4)
NetMsg msg = new NetMsg(decoded, id);// (4)
msg.setMsgID(EMSGID.values()[id]);
msg.setNodeRoute(nodeRoute);
return msg;
}
}

View File

@ -1,4 +1,4 @@
package server;
package com.myself.server;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
@ -17,10 +17,11 @@ public class MEncoder extends OneToOneEncoder
}
NetMsg res = (NetMsg)msg;
byte[] data = res.getBytes();
int dataLength = data.length+4;
int dataLength = data.length+8;
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();//(2)
buf.writeInt(dataLength);
buf.writeInt(res.msgID.ordinal());
buf.writeInt(res.getNodeRoute());
buf.writeBytes(data);
return buf;//(3)
}

View File

@ -1,19 +1,22 @@
package server;
package com.myself.server;
import com.google.protobuf.GeneratedMessage;
import com.google.protobuf.MessageLite;
import common.EMSGID;
import common.MessageManager;
public class NetMsg // package different messages
public class NetMsg
{
EMSGID msgID;
MessageLite messageLite;
int nodeRoute;
private NetMsg(){};
public static NetMsg newMessage()
{
return new NetMsg();
NetMsg msg = new NetMsg();
msg.setNodeRoute(0);
return msg;
}
NetMsg(byte[] decoded, int id) throws Exception
@ -21,31 +24,38 @@ public class NetMsg // package different messages
messageLite = MessageManager.getMessage(id, decoded);
}
public byte[] getBytes() //get data in messageLite
public byte[] getBytes()
{
return messageLite.toByteArray();
}
public EMSGID getMsgID() //get message catagory
public EMSGID getMsgID()
{
return msgID;
}
public void setMsgID(EMSGID id) { //according EMSGID.java set the message ID
public void setMsgID(EMSGID id) {
this.msgID = id;
}
@SuppressWarnings("unchecked")
public <T extends MessageLite> T getMessageLite() //get messageLite
public <T extends MessageLite> T getMessageLite()
{
return (T)messageLite;
}
//Ḭ̈߳²È«µÄ
@SuppressWarnings("rawtypes")
public void setMessageLite(GeneratedMessage.Builder builder)
{
this.messageLite = builder.build();
}
public int getNodeRoute() {
return nodeRoute;
}
public void setNodeRoute(int nodeRoute) {
this.nodeRoute = nodeRoute;
}
}

View File

@ -23,6 +23,7 @@ import messageBody.requestMsg.nr_write_res;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.util.internal.ConcurrentHashMap;
import org.json.JSONObject;
import com.my.memcached.MemcachedClient;
import com.myself.client.ClientMgr;
@ -152,6 +153,10 @@ public class memSession implements Runnable {
return LockKeyMap.remove(key) != null;
}
public int getCurrentNode() {
return (int) Math.pow(2, ClientMgr.getInstance().mClientNumber);
}
public void handle(MessageEvent e) {
NetMsg msg = (NetMsg) e.getMessage();
@ -160,15 +165,17 @@ public class memSession implements Runnable {
nr_Stats msgLite = msg.getMessageLite();
@SuppressWarnings("rawtypes")
Map stats = client.stats();
JSONObject jStats = new JSONObject(stats);
if (stats != null) {
nr_Stats_res.Builder builder = nr_Stats_res.newBuilder();
builder.setKey("");
builder.setValue(stats.toString());
builder.setValue(jStats.toString());
builder.setTime(msgLite.getTime());
NetMsg send = NetMsg.newMessage();
send.setMessageLite(builder);
send.setMsgID(EMSGID.nr_stats_res);
send.setNodeRoute(getCurrentNode());
webServeChannel.write(send);
return;
}
@ -225,7 +232,7 @@ public class memSession implements Runnable {
NetMsg send = NetMsg.newMessage();
send.setMessageLite(builder);
send.setMsgID(EMSGID.nr_read_res);
send.setNodeRoute(getCurrentNode());
webServeChannel.write(send);
return;
} else if (state == LockKey.unLock) {
@ -239,8 +246,8 @@ public class memSession implements Runnable {
NetMsg send = NetMsg.newMessage();
send.setMessageLite(builder);
send.setMsgID(EMSGID.nr_read_res);
send.setNodeRoute(getCurrentNode());
webServeChannel.write(send);
return;
}
}
@ -251,6 +258,7 @@ public class memSession implements Runnable {
NetMsg send = NetMsg.newMessage();
send.setMessageLite(builder);
send.setNodeRoute(getCurrentNode());
send.setMsgID(EMSGID.nm_read);
if (sendOtherCopyMsg(gethashMem(msgLite.getKey()), send) == false) {
@ -281,6 +289,7 @@ public class memSession implements Runnable {
NetMsg send = NetMsg.newMessage();
send.setMessageLite(builder);
send.setMsgID(EMSGID.nr_read_res);
send.setNodeRoute(getCurrentNode() + msg.getNodeRoute());
webServeChannel.write(send);
nm_read_recovery.Builder builder1 = nm_read_recovery.newBuilder();
@ -302,6 +311,7 @@ public class memSession implements Runnable {
NetMsg send = NetMsg.newMessage();
send.setMessageLite(builder);
send.setMsgID(EMSGID.nr_read_res);
send.setNodeRoute(getCurrentNode() + msg.getNodeRoute());
webServeChannel.write(send);
}
@ -336,6 +346,7 @@ public class memSession implements Runnable {
NetMsg send2 = NetMsg.newMessage();
send2.setMessageLite(builder2);
send2.setMsgID(EMSGID.nr_write_res);
send2.setNodeRoute(getCurrentNode());
webServeChannel.write(send2);
return;
}
@ -355,6 +366,7 @@ public class memSession implements Runnable {
NetMsg send2 = NetMsg.newMessage();
send2.setMessageLite(builder2);
send2.setMsgID(EMSGID.nr_write_res);
send2.setNodeRoute(getCurrentNode());
webServeChannel.write(send2);
System.out.println("write lock conflict, please request again.");
return;
@ -381,6 +393,7 @@ public class memSession implements Runnable {
NetMsg send2 = NetMsg.newMessage();
send2.setMessageLite(builder2);
send2.setMsgID(EMSGID.nr_write_res);
send2.setNodeRoute(getCurrentNode());
webServeChannel.write(send2);
}
@ -437,6 +450,7 @@ public class memSession implements Runnable {
NetMsg send2 = NetMsg.newMessage();
send2.setMessageLite(builder2);
send2.setMsgID(EMSGID.nr_write_res);
send2.setNodeRoute(getCurrentNode() + msg.getNodeRoute());
webServeChannel.write(send2);
nm_write_2.Builder builder = nm_write_2.newBuilder();
@ -459,6 +473,7 @@ public class memSession implements Runnable {
NetMsg send2 = NetMsg.newMessage();
send2.setMessageLite(builder2);
send2.setMsgID(EMSGID.nr_write_res);
send2.setNodeRoute(getCurrentNode() + msg.getNodeRoute());
webServeChannel.write(send2);
}
}