From 3680c3debcf62e0375bcb4c453cff7ea97794e09 Mon Sep 17 00:00:00 2001 From: lyr90329 Date: Tue, 5 Aug 2014 15:52:09 +0800 Subject: [PATCH] Extract variables which can be configured and fix some bug --- src/client/ClientMgr.java | 8 +++++++- src/server/LockKey.java | 2 +- src/server/MDecoder.java | 4 ++-- src/server/MEncoder.java | 4 ++-- src/server/MServerHandler.java | 8 ++++---- src/server/MemcachedMain.java | 25 ++++++++++++++++++++++++- src/server/NetMsg.java | 2 +- src/server/memSession.java | 9 +++++++-- 8 files changed, 48 insertions(+), 14 deletions(-) diff --git a/src/client/ClientMgr.java b/src/client/ClientMgr.java index a426304..d05661a 100644 --- a/src/client/ClientMgr.java +++ b/src/client/ClientMgr.java @@ -3,6 +3,7 @@ package client; import java.util.HashMap; import java.util.Iterator; import java.util.Map.Entry; + import server.ClientConfig; public class ClientMgr @@ -10,7 +11,11 @@ public class ClientMgr public HashMap m_mapLocalClients; static ClientMgr clientMgr; public int mClientNumber; - public static final int nCopyNode = 4; + public static int nCopyNode; + public static int protocol; + public static final Integer twoPhaseCommit = 0; + public static final Integer paxos = (nCopyNode-1)-((nCopyNode-1)/2+1); + public static final Integer weak = nCopyNode-2; public static ClientMgr getInstance() { @@ -25,6 +30,7 @@ public class ClientMgr return m_mapLocalClients.size()+1; } + // num 为hash第一个节点 public boolean isCopyNode(Integer hash) { if ((mClientNumber-hash + m_mapLocalClients.size()+1) diff --git a/src/server/LockKey.java b/src/server/LockKey.java index 218753d..2ef798d 100644 --- a/src/server/LockKey.java +++ b/src/server/LockKey.java @@ -1,4 +1,4 @@ -package com.myself.server; +package server; public class LockKey { public Integer memNumber = 0; diff --git a/src/server/MDecoder.java b/src/server/MDecoder.java index 8b05831..c03c09a 100644 --- a/src/server/MDecoder.java +++ b/src/server/MDecoder.java @@ -1,4 +1,4 @@ -package com.myself.server; +package server; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; @@ -7,7 +7,7 @@ import org.jboss.netty.handler.codec.frame.FrameDecoder; import common.EMSGID; -public class MDecoder extends FrameDecoder //Decoder stream +public class MDecoder extends FrameDecoder { @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { diff --git a/src/server/MEncoder.java b/src/server/MEncoder.java index 9b20736..3d75d1b 100644 --- a/src/server/MEncoder.java +++ b/src/server/MEncoder.java @@ -1,4 +1,4 @@ -package com.myself.server; +package server; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; @@ -6,7 +6,7 @@ import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; -public class MEncoder extends OneToOneEncoder //Encoder Message +public class MEncoder extends OneToOneEncoder { @Override protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception diff --git a/src/server/MServerHandler.java b/src/server/MServerHandler.java index a681026..fc8e958 100644 --- a/src/server/MServerHandler.java +++ b/src/server/MServerHandler.java @@ -1,4 +1,4 @@ -package com.myself.server; +package server; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; @@ -8,16 +8,16 @@ import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -public class MServerHandler extends SimpleChannelUpstreamHandler +public class MServerHandler extends SimpleChannelUpstreamHandler { @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) //handling received message + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { if (!(e.getMessage() instanceof NetMsg)) { return; } - // Put this Message into queue, and then handle it. + memSession.getInstance().addSession(e); } @Override diff --git a/src/server/MemcachedMain.java b/src/server/MemcachedMain.java index 59029ed..8024468 100644 --- a/src/server/MemcachedMain.java +++ b/src/server/MemcachedMain.java @@ -34,6 +34,29 @@ public class MemcachedMain { File f = new File(System.getProperty("user.dir")); String path = f.getPath() + File.separator + "bin" + File.separator; readClientsXML(path + "client.xml"); + try { + Properties properties = new Properties(); + properties.load(new FileInputStream(path+"config.properties")); + webServerHost = properties.getProperty("webServerHost").toString(); + ClientMgr.nCopyNode = Integer.parseInt(properties.getProperty("replicasNum")); + protocolName = properties.getProperty("consistencyProtocol").toString(); + if(protocolName.equals("twoPhaseCommit")){ + ClientMgr.protocol = ClientMgr.twoPhaseCommit; + }else if(protocolName.equals("paxos")){ + ClientMgr.protocol = ClientMgr.paxos; + }else if(protocolName.equals("weak")){ + ClientMgr.protocol = ClientMgr.weak; + }else{ + System.err.print("consistency protocol input error"); + return false; + } + } catch (FileNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } return true; } @@ -57,7 +80,7 @@ public class MemcachedMain { clientMgr.init(num, m_mapMemcachedClient); Client webClient = new Client(); - webClient.init("192.168.3.222", 8888); + webClient.init(webServerHost, 8888); } public static void main(String[] args) { diff --git a/src/server/NetMsg.java b/src/server/NetMsg.java index 95b22cb..b363b39 100644 --- a/src/server/NetMsg.java +++ b/src/server/NetMsg.java @@ -1,4 +1,4 @@ -package com.myself.server; +package server; import com.google.protobuf.GeneratedMessage; import com.google.protobuf.MessageLite; diff --git a/src/server/memSession.java b/src/server/memSession.java index 1c11639..3956060 100644 --- a/src/server/memSession.java +++ b/src/server/memSession.java @@ -49,7 +49,7 @@ public class memSession implements Runnable { System.out.println("session start"); } - + // 增加client连接 public void addClientChannel(Integer num, Channel ch) { ClientChannelMap.put(num, ch); } @@ -58,6 +58,7 @@ public class memSession implements Runnable { return ClientChannelMap.get(id); } + // 删掉client连接 @SuppressWarnings("rawtypes") public void removeClientChannel(Channel ch) { Iterator iter = ClientChannelMap.entrySet().iterator(); @@ -403,7 +404,7 @@ public class memSession implements Runnable { case nm_write_1_res: { nm_write_1_res msgLite = msg.getMessageLite(); - if (desLockKeyCount(msgLite.getKey()) == 0) { + if (desLockKeyCount(msgLite.getKey()) == ClientMgr.protocol) { boolean res = client.set(msgLite.getKey(), msgLite.getValue()); if (res) { @@ -466,6 +467,10 @@ public class memSession implements Runnable { recvQueue.offer(e); } + public int gethashMem(String key) { + return Math.abs(key.hashCode() % ClientMgr.getInstance().getSize()); + } + public boolean sendOtherCopyMsg(Integer hash, NetMsg msg) { for (int i = 0; i < ClientMgr.nCopyNode; i++) { Integer index = (hash + i + ClientMgr.getInstance().getSize())