change dir
This commit is contained in:
parent
e5425d082d
commit
448802c7fd
|
@ -0,0 +1,79 @@
|
||||||
|
option java_package = "messageBody";
|
||||||
|
option java_outer_classname="memcachedmsg";
|
||||||
|
|
||||||
|
message Test1
|
||||||
|
{
|
||||||
|
optional string key=1;//string delegate String
|
||||||
|
}
|
||||||
|
message messageTest//内部类名
|
||||||
|
{
|
||||||
|
//optional int32 a = 1; 中1这样的序列号在一个类不能重复
|
||||||
|
optional int32 a = 1;//optional代表可选 一般写这个 int32代表java的int
|
||||||
|
optional int64 b = 2;//int64代表java的Long
|
||||||
|
optional string key=3;//string 代表String
|
||||||
|
optional bool d = 4;//bool代表boolean
|
||||||
|
repeated int32 e=5;//代表 List<Integer>
|
||||||
|
optional Test1 f=6;// 嵌套对象
|
||||||
|
}
|
||||||
|
|
||||||
|
message nm_Connected
|
||||||
|
{
|
||||||
|
optional int32 num = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message nm_Connected_mem_back
|
||||||
|
{
|
||||||
|
optional int32 num = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message nm_Connected_web_back
|
||||||
|
{
|
||||||
|
optional int32 num = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
message nm_read
|
||||||
|
{
|
||||||
|
optional string key=1;
|
||||||
|
optional string value=2;
|
||||||
|
optional int64 time=3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message nm_read_recovery
|
||||||
|
{
|
||||||
|
optional string key=1;
|
||||||
|
optional string value=2;
|
||||||
|
optional int64 time=3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message nm_write_1
|
||||||
|
{
|
||||||
|
optional string key=1;
|
||||||
|
optional string value=2;
|
||||||
|
optional int64 time=3;
|
||||||
|
optional int32 memID=4;
|
||||||
|
}
|
||||||
|
|
||||||
|
message nm_write_1_res
|
||||||
|
{
|
||||||
|
optional string key=1;
|
||||||
|
optional string value=2;
|
||||||
|
optional int64 time=3;
|
||||||
|
optional int32 memID=4;
|
||||||
|
}
|
||||||
|
|
||||||
|
message nm_write_2
|
||||||
|
{
|
||||||
|
optional string key=1;
|
||||||
|
optional string value=2;
|
||||||
|
optional int64 time=3;
|
||||||
|
optional int32 memID=4;
|
||||||
|
}
|
||||||
|
|
||||||
|
message nm_write_2_res
|
||||||
|
{
|
||||||
|
optional string key=1;
|
||||||
|
optional string value=2;
|
||||||
|
optional int64 time=3;
|
||||||
|
optional int32 memID=4;
|
||||||
|
}
|
|
@ -0,0 +1,42 @@
|
||||||
|
option java_package = "messageBody";//°üÃû
|
||||||
|
option java_outer_classname="requestMsg"; //ˈ̟
|
||||||
|
|
||||||
|
// Á¬½Ómemcached server
|
||||||
|
message nr_Connected_mem
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
message nr_Connected_mem_back
|
||||||
|
{
|
||||||
|
optional int32 memID=1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message nr_Read
|
||||||
|
{
|
||||||
|
optional string key=1;
|
||||||
|
optional int32 clientid=2;
|
||||||
|
optional int64 time=3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message nr_Read_res
|
||||||
|
{
|
||||||
|
optional string key=1;
|
||||||
|
optional string value=2;
|
||||||
|
optional int64 time=3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message nr_write
|
||||||
|
{
|
||||||
|
optional string key=1;
|
||||||
|
optional string value=2;
|
||||||
|
optional int64 time=3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message nr_write_res
|
||||||
|
{
|
||||||
|
optional string key=1;
|
||||||
|
optional string value=2;
|
||||||
|
optional int64 time=3;
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<clients>
|
||||||
|
<client>
|
||||||
|
<id>0000</id>
|
||||||
|
<host>127.0.0.1</host>
|
||||||
|
<client_port>30000</client_port>
|
||||||
|
<memcached>127.0.0.1:20000</memcached>
|
||||||
|
</client>
|
||||||
|
<client>
|
||||||
|
<id>0001</id>
|
||||||
|
<host>127.0.0.1</host>
|
||||||
|
<client_port>30001</client_port>
|
||||||
|
<memcached>127.0.0.1:20001</memcached>
|
||||||
|
</client>
|
||||||
|
<client>
|
||||||
|
<id>0002</id>
|
||||||
|
<host>127.0.0.1</host>
|
||||||
|
<client_port>30002</client_port>
|
||||||
|
<memcached>127.0.0.1:20002</memcached>
|
||||||
|
</client>
|
||||||
|
<client>
|
||||||
|
<id>0003</id>
|
||||||
|
<host>127.0.0.1</host>
|
||||||
|
<client_port>30003</client_port>
|
||||||
|
<memcached>127.0.0.1:20003</memcached>
|
||||||
|
</client>
|
||||||
|
</clients>
|
|
@ -0,0 +1,39 @@
|
||||||
|
package client;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import org.jboss.netty.bootstrap.ClientBootstrap;
|
||||||
|
import org.jboss.netty.channel.ChannelFuture;
|
||||||
|
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
|
||||||
|
|
||||||
|
public class Client
|
||||||
|
{
|
||||||
|
ClientBootstrap bootstrap;
|
||||||
|
ChannelFuture channelFuture;
|
||||||
|
String host;
|
||||||
|
int port;
|
||||||
|
int id;
|
||||||
|
|
||||||
|
public boolean init(String host, int port)
|
||||||
|
{
|
||||||
|
bootstrap = new ClientBootstrap(
|
||||||
|
new NioClientSocketChannelFactory(
|
||||||
|
Executors.newCachedThreadPool(),
|
||||||
|
Executors.newCachedThreadPool()));
|
||||||
|
// Set up the event pipeline factory.
|
||||||
|
bootstrap.setPipelineFactory(new MClientPipelineFactory());
|
||||||
|
// Start the connection attempt.
|
||||||
|
channelFuture = bootstrap.connect(new InetSocketAddress(host, port));
|
||||||
|
return channelFuture.isSuccess();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop()
|
||||||
|
{
|
||||||
|
channelFuture.awaitUninterruptibly();
|
||||||
|
if (!channelFuture.isSuccess())
|
||||||
|
{
|
||||||
|
channelFuture.getCause().printStackTrace();
|
||||||
|
}
|
||||||
|
channelFuture.getChannel().getCloseFuture().awaitUninterruptibly();
|
||||||
|
bootstrap.releaseExternalResources();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,120 @@
|
||||||
|
package client;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
|
||||||
|
import server.ClientConfig;
|
||||||
|
|
||||||
|
public class ClientMgr
|
||||||
|
{
|
||||||
|
public HashMap<Integer, Client> m_mapLocalClients;
|
||||||
|
static ClientMgr clientMgr;
|
||||||
|
public int mClientNumber;
|
||||||
|
public static int nCopyNode;
|
||||||
|
public static int protocol;
|
||||||
|
public static final Integer twoPhaseCommit = 0;
|
||||||
|
public static final Integer paxos = (nCopyNode-1)-((nCopyNode-1)/2+1);
|
||||||
|
public static final Integer weak = nCopyNode-2;
|
||||||
|
|
||||||
|
public static ClientMgr getInstance()
|
||||||
|
{
|
||||||
|
if (clientMgr == null)
|
||||||
|
{
|
||||||
|
clientMgr = new ClientMgr();
|
||||||
|
}
|
||||||
|
return clientMgr;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getSize() {
|
||||||
|
return m_mapLocalClients.size()+1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// num ΪhashµÚÒ»¸ö½Úµã
|
||||||
|
public boolean isCopyNode(Integer hash)
|
||||||
|
{
|
||||||
|
if ((mClientNumber-hash + m_mapLocalClients.size()+1)
|
||||||
|
%(m_mapLocalClients.size()+1)<nCopyNode)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
public void init(int num, HashMap<Integer, ClientConfig> hm)
|
||||||
|
{
|
||||||
|
m_mapLocalClients = new HashMap<Integer, Client>();
|
||||||
|
mClientNumber = num;
|
||||||
|
|
||||||
|
Iterator iter = hm.entrySet().iterator();
|
||||||
|
while (iter.hasNext())
|
||||||
|
{
|
||||||
|
Entry entry = (Entry) iter.next();
|
||||||
|
ClientConfig cc = (ClientConfig)entry.getValue();
|
||||||
|
if (cc.id != mClientNumber)
|
||||||
|
{
|
||||||
|
Client lc = new Client();
|
||||||
|
lc.host = cc.host;
|
||||||
|
lc.port = cc.client_port;
|
||||||
|
lc.id = cc.id;
|
||||||
|
m_mapLocalClients.put(lc.id, lc);
|
||||||
|
if(lc.init(lc.host, lc.port))
|
||||||
|
{
|
||||||
|
System.out.println("client connected successful");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public String getCopyHost(Integer number, Integer index)
|
||||||
|
{
|
||||||
|
Client client =m_mapLocalClients.get((number+index)%getSize());
|
||||||
|
if (client==null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return client.host+client.port;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getCurCoyhost(Integer number)
|
||||||
|
{
|
||||||
|
Client client =m_mapLocalClients.get((number+mClientNumber)%m_mapLocalClients.size());
|
||||||
|
if (client==null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return client.host+client.port;
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
public Integer getClientNum(String host)
|
||||||
|
{
|
||||||
|
Iterator iter = m_mapLocalClients.entrySet().iterator();
|
||||||
|
while (iter.hasNext())
|
||||||
|
{
|
||||||
|
Entry entry = (Entry) iter.next();
|
||||||
|
Client client = (Client)entry.getValue();
|
||||||
|
if (host.compareTo("/"+client.host+":"+client.port) == 0)
|
||||||
|
{
|
||||||
|
return client.id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
public void connect(String host)
|
||||||
|
{
|
||||||
|
Iterator iter = m_mapLocalClients.entrySet().iterator();
|
||||||
|
while (iter.hasNext())
|
||||||
|
{
|
||||||
|
Entry entry = (Entry) iter.next();
|
||||||
|
Client client = (Client)entry.getValue();
|
||||||
|
if (client.host == host)
|
||||||
|
{
|
||||||
|
client.init(client.host, client.port);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,44 @@
|
||||||
|
package client;
|
||||||
|
|
||||||
|
import messageBody.memcachedmsg.nm_Connected;
|
||||||
|
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||||
|
import org.jboss.netty.channel.ChannelStateEvent;
|
||||||
|
import org.jboss.netty.channel.ExceptionEvent;
|
||||||
|
import org.jboss.netty.channel.MessageEvent;
|
||||||
|
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||||
|
|
||||||
|
import server.NetMsg;
|
||||||
|
import common.EMSGID;
|
||||||
|
|
||||||
|
|
||||||
|
public class MClientHandler extends SimpleChannelUpstreamHandler
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
|
||||||
|
{
|
||||||
|
nm_Connected.Builder builder = nm_Connected.newBuilder();
|
||||||
|
builder.setNum(ClientMgr.getInstance().mClientNumber);
|
||||||
|
|
||||||
|
NetMsg sendMsg = NetMsg.newMessage();
|
||||||
|
sendMsg.setMsgID(EMSGID.nm_connected);
|
||||||
|
sendMsg.setMessageLite(builder);
|
||||||
|
e.getChannel().write(sendMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
|
||||||
|
{
|
||||||
|
if (!(e.getMessage() instanceof NetMsg))
|
||||||
|
{
|
||||||
|
return;//(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
|
||||||
|
{
|
||||||
|
if (e.getChannel().getLocalAddress() == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,21 @@
|
||||||
|
package client;
|
||||||
|
|
||||||
|
import org.jboss.netty.channel.ChannelPipeline;
|
||||||
|
import org.jboss.netty.channel.ChannelPipelineFactory;
|
||||||
|
import org.jboss.netty.channel.Channels;
|
||||||
|
|
||||||
|
import server.MDecoder;
|
||||||
|
import server.MEncoder;
|
||||||
|
|
||||||
|
public class MClientPipelineFactory implements ChannelPipelineFactory
|
||||||
|
{
|
||||||
|
|
||||||
|
public ChannelPipeline getPipeline() throws Exception
|
||||||
|
{
|
||||||
|
ChannelPipeline pipeline = Channels.pipeline();
|
||||||
|
|
||||||
|
pipeline.addLast("decoder", new MDecoder());
|
||||||
|
pipeline.addLast("encoder", new MEncoder());
|
||||||
|
return pipeline;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,20 @@
|
||||||
|
package common;
|
||||||
|
public enum EMSGID
|
||||||
|
{
|
||||||
|
nr_read, //WebApp send read request to R-Memcached
|
||||||
|
nr_read_res, //R-Memcached respond read request from WebServer
|
||||||
|
nr_write, //WebApp send write request to R-Memcached
|
||||||
|
nr_write_copy,
|
||||||
|
nr_write_res, //R-Memcached respond write request from WebServer
|
||||||
|
nr_connected_mem, //Init connection channel between WebApp and R-Memcached
|
||||||
|
nr_connected_mem_back,
|
||||||
|
|
||||||
|
nm_connected, //Init connection channel between R-Memcached and R-Memcached
|
||||||
|
nm_connected_mem_back,
|
||||||
|
nm_connected_web_back,
|
||||||
|
nm_read, //R-Memcached send read request to another R-Memcached
|
||||||
|
nm_read_recovery, //data recovery
|
||||||
|
nm_write_1, //write phase 1
|
||||||
|
nm_write_1_res,
|
||||||
|
nm_write_2 //write phase 2
|
||||||
|
}
|
|
@ -0,0 +1,41 @@
|
||||||
|
package common;
|
||||||
|
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import com.google.protobuf.GeneratedMessage;
|
||||||
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
|
import com.google.protobuf.MessageLite;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @author Yanran Lu
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class MessageManager { //create messageMap to manage all messages
|
||||||
|
private static Map<Integer, MessageLite> messageMap = new HashMap<Integer, MessageLite>();
|
||||||
|
|
||||||
|
public static void addMessageCla(int id, //add message to map
|
||||||
|
Class<? extends GeneratedMessage> msgCla)
|
||||||
|
throws NoSuchMethodException, SecurityException,
|
||||||
|
IllegalAccessException, IllegalArgumentException,
|
||||||
|
InvocationTargetException {
|
||||||
|
if (msgCla == null)
|
||||||
|
return;
|
||||||
|
Method method = msgCla.getMethod("getDefaultInstance");
|
||||||
|
MessageLite lite = (MessageLite) method.invoke(null, null);
|
||||||
|
messageMap.put(id, lite);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static MessageLite getMessage(int id, byte[] body) //get message from map
|
||||||
|
throws InvalidProtocolBufferException {
|
||||||
|
MessageLite list = messageMap.get(id);
|
||||||
|
if (list == null) {
|
||||||
|
System.err.printf("msg %d no register", id);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return list.newBuilderForType().mergeFrom(body).build();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,56 @@
|
||||||
|
package common;
|
||||||
|
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
|
||||||
|
import messageBody.memcachedmsg.nm_Connected;
|
||||||
|
import messageBody.memcachedmsg.nm_Connected_mem_back;
|
||||||
|
import messageBody.memcachedmsg.nm_read;
|
||||||
|
import messageBody.memcachedmsg.nm_read_recovery;
|
||||||
|
import messageBody.memcachedmsg.nm_write_1;
|
||||||
|
import messageBody.memcachedmsg.nm_write_1_res;
|
||||||
|
import messageBody.memcachedmsg.nm_write_2;
|
||||||
|
import messageBody.requestMsg.nr_Connected_mem;
|
||||||
|
import messageBody.requestMsg.nr_Read;
|
||||||
|
import messageBody.requestMsg.nr_write;
|
||||||
|
|
||||||
|
import com.google.protobuf.GeneratedMessage;
|
||||||
|
|
||||||
|
public class RegisterHandler {
|
||||||
|
public static void initHandler() // register all messages created by
|
||||||
|
// protobuf and will be handled by R-Memcached
|
||||||
|
{
|
||||||
|
initHandler(EMSGID.nm_connected.ordinal(), nm_Connected.class);
|
||||||
|
initHandler(EMSGID.nm_connected_mem_back.ordinal(),
|
||||||
|
nm_Connected_mem_back.class);
|
||||||
|
initHandler(EMSGID.nr_connected_mem.ordinal(), nr_Connected_mem.class);
|
||||||
|
initHandler(EMSGID.nr_read.ordinal(), nr_Read.class);
|
||||||
|
initHandler(EMSGID.nm_read.ordinal(), nm_read.class);
|
||||||
|
initHandler(EMSGID.nm_read_recovery.ordinal(), nm_read_recovery.class);
|
||||||
|
initHandler(EMSGID.nr_write.ordinal(), nr_write.class);
|
||||||
|
initHandler(EMSGID.nm_write_1.ordinal(), nm_write_1.class);
|
||||||
|
initHandler(EMSGID.nm_write_1_res.ordinal(), nm_write_1_res.class);
|
||||||
|
initHandler(EMSGID.nm_write_2.ordinal(), nm_write_2.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void initHandler(int id,
|
||||||
|
Class<? extends GeneratedMessage> msgCla) {
|
||||||
|
try {
|
||||||
|
MessageManager.addMessageCla(id, msgCla);
|
||||||
|
} catch (NoSuchMethodException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (SecurityException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (IllegalAccessException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (InvocationTargetException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,3 @@
|
||||||
|
webServerHost = 192.168.3.222
|
||||||
|
replicasNum = 4
|
||||||
|
consistencyProtocol =twoPhaseCommit
|
|
@ -0,0 +1,4 @@
|
||||||
|
log4j.rootLogger=ERROR, stdout
|
||||||
|
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||||
|
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
||||||
|
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
|
|
@ -0,0 +1,117 @@
|
||||||
|
package cn.edu.buaa.act.memcachedClient;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public final class ByteBufArrayInputStream extends InputStream implements
|
||||||
|
LineInputStream {
|
||||||
|
private ByteBuffer[] bufs;
|
||||||
|
private int currentBuf = 0;
|
||||||
|
|
||||||
|
public ByteBufArrayInputStream(List<ByteBuffer> bufs) throws Exception {
|
||||||
|
this(bufs.toArray(new ByteBuffer[] {}));
|
||||||
|
}
|
||||||
|
|
||||||
|
public ByteBufArrayInputStream(ByteBuffer[] bufs) throws Exception {
|
||||||
|
if (bufs == null || bufs.length == 0)
|
||||||
|
throw new Exception("buffer is empty");
|
||||||
|
|
||||||
|
this.bufs = bufs;
|
||||||
|
for (ByteBuffer b : bufs)
|
||||||
|
b.flip();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int read() {
|
||||||
|
do {
|
||||||
|
if (bufs[currentBuf].hasRemaining())
|
||||||
|
return bufs[currentBuf].get();
|
||||||
|
currentBuf++;
|
||||||
|
} while (currentBuf < bufs.length);
|
||||||
|
|
||||||
|
currentBuf--;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int read(byte[] buf) {
|
||||||
|
int len = buf.length;
|
||||||
|
int bufPos = 0;
|
||||||
|
do {
|
||||||
|
if (bufs[currentBuf].hasRemaining()) {
|
||||||
|
int n = Math.min(bufs[currentBuf].remaining(), len - bufPos);
|
||||||
|
bufs[currentBuf].get(buf, bufPos, n);
|
||||||
|
bufPos += n;
|
||||||
|
}
|
||||||
|
currentBuf++;
|
||||||
|
} while (currentBuf < bufs.length && bufPos < len);
|
||||||
|
|
||||||
|
currentBuf--;
|
||||||
|
|
||||||
|
if (bufPos > 0 || (bufPos == 0 && len == 0))
|
||||||
|
return bufPos;
|
||||||
|
else
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String readLine() throws IOException {
|
||||||
|
byte[] b = new byte[1];
|
||||||
|
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||||
|
boolean eol = false;
|
||||||
|
|
||||||
|
while (read(b, 0, 1) != -1) {
|
||||||
|
if (b[0] == 13) {
|
||||||
|
eol = true;
|
||||||
|
} else {
|
||||||
|
if (eol) {
|
||||||
|
if (b[0] == 10)
|
||||||
|
break;
|
||||||
|
eol = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// cast byte into char array
|
||||||
|
bos.write(b, 0, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (bos == null || bos.size() <= 0) {
|
||||||
|
throw new IOException(
|
||||||
|
"++++ Stream appears to be dead, so closing it down");
|
||||||
|
}
|
||||||
|
|
||||||
|
// else return the string
|
||||||
|
return bos.toString().trim();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void clearEOL() throws IOException {
|
||||||
|
byte[] b = new byte[1];
|
||||||
|
boolean eol = false;
|
||||||
|
while (read(b, 0, 1) != -1) {
|
||||||
|
|
||||||
|
// only stop when we see
|
||||||
|
// \r (13) followed by \n (10)
|
||||||
|
if (b[0] == 13) {
|
||||||
|
eol = true;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (eol) {
|
||||||
|
if (b[0] == 10)
|
||||||
|
break;
|
||||||
|
eol = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
StringBuilder sb = new StringBuilder("ByteBufArrayIS: ");
|
||||||
|
sb.append(bufs.length).append(" bufs of sizes: \n");
|
||||||
|
|
||||||
|
for (int i = 0; i < bufs.length; i++) {
|
||||||
|
sb.append(" ").append(i)
|
||||||
|
.append(": ").append(bufs[i]).append("\n");
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,22 @@
|
||||||
|
package cn.edu.buaa.act.memcachedClient;
|
||||||
|
|
||||||
|
import java.io.*;
|
||||||
|
|
||||||
|
public class ContextObjectInputStream extends ObjectInputStream {
|
||||||
|
|
||||||
|
ClassLoader mLoader;
|
||||||
|
|
||||||
|
public ContextObjectInputStream(InputStream in, ClassLoader loader)
|
||||||
|
throws IOException, SecurityException {
|
||||||
|
super(in);
|
||||||
|
mLoader = loader;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Class resolveClass(ObjectStreamClass v) throws IOException,
|
||||||
|
ClassNotFoundException {
|
||||||
|
if (mLoader == null)
|
||||||
|
return super.resolveClass(v);
|
||||||
|
else
|
||||||
|
return Class.forName(v.getName(), true, mLoader);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,51 @@
|
||||||
|
package cn.edu.buaa.act.memcachedClient;
|
||||||
|
|
||||||
|
public interface ErrorHandler {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called for errors thrown during initialization.
|
||||||
|
*/
|
||||||
|
public void handleErrorOnInit( final MemcachedClient client ,
|
||||||
|
final Throwable error );
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called for errors thrown during {@link MemcachedClient#get(String)} and related methods.
|
||||||
|
*/
|
||||||
|
public void handleErrorOnGet( final MemcachedClient client ,
|
||||||
|
final Throwable error ,
|
||||||
|
final String cacheKey );
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called for errors thrown during {@link MemcachedClient#getMulti(String)} and related methods.
|
||||||
|
*/
|
||||||
|
public void handleErrorOnGet( final MemcachedClient client ,
|
||||||
|
final Throwable error ,
|
||||||
|
final String[] cacheKeys );
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called for errors thrown during {@link MemcachedClient#set(String,Object)} and related methods.
|
||||||
|
*/
|
||||||
|
public void handleErrorOnSet( final MemcachedClient client ,
|
||||||
|
final Throwable error ,
|
||||||
|
final String cacheKey );
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called for errors thrown during {@link MemcachedClient#delete(String)} and related methods.
|
||||||
|
*/
|
||||||
|
public void handleErrorOnDelete( final MemcachedClient client ,
|
||||||
|
final Throwable error ,
|
||||||
|
final String cacheKey );
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called for errors thrown during {@link MemcachedClient#flushAll()} and related methods.
|
||||||
|
*/
|
||||||
|
public void handleErrorOnFlush( final MemcachedClient client ,
|
||||||
|
final Throwable error );
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called for errors thrown during {@link MemcachedClient#stats()} and related methods.
|
||||||
|
*/
|
||||||
|
public void handleErrorOnStats( final MemcachedClient client ,
|
||||||
|
final Throwable error);
|
||||||
|
|
||||||
|
} // interface
|
|
@ -0,0 +1,28 @@
|
||||||
|
package cn.edu.buaa.act.memcachedClient;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public interface LineInputStream {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read everything up to the next end-of-line. Does not include the end of
|
||||||
|
* line, though it is consumed from the input.
|
||||||
|
*
|
||||||
|
* @return All next up to the next end of line.
|
||||||
|
*/
|
||||||
|
public String readLine() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read everything up to and including the end of line.
|
||||||
|
*/
|
||||||
|
public void clearEOL() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read some bytes.
|
||||||
|
*
|
||||||
|
* @param buf
|
||||||
|
* The buffer into which read.
|
||||||
|
* @return The number of bytes actually read, or -1 if none could be read.
|
||||||
|
*/
|
||||||
|
public int read(byte[] buf) throws IOException;
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,412 @@
|
||||||
|
package cn.edu.buaa.act.memcachedClient;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle encoding standard Java types directly which can result in significant
|
||||||
|
* memory savings:
|
||||||
|
*
|
||||||
|
* Currently the Memcached driver for Java supports the setSerialize() option.
|
||||||
|
* This can increase performance in some situations but has a few issues:
|
||||||
|
*
|
||||||
|
* Code that performs class casting will throw ClassCastExceptions when
|
||||||
|
* setSerialize is enabled. For example:
|
||||||
|
*
|
||||||
|
* mc.set( "foo", new Integer( 1 ) ); Integer output = (Integer)mc.get("foo");
|
||||||
|
*
|
||||||
|
* Will work just file when setSerialize is true but when its false will just
|
||||||
|
* throw a ClassCastException.
|
||||||
|
*
|
||||||
|
* Also internally it doesn't support Boolean and since toString is called
|
||||||
|
* wastes a lot of memory and causes additional performance issue. For example
|
||||||
|
* an Integer can take anywhere from 1 byte to 10 bytes.
|
||||||
|
*
|
||||||
|
* Due to the way the memcached slab allocator works it seems like a LOT of
|
||||||
|
* wasted memory to store primitive types as serialized objects (from a
|
||||||
|
* performance and memory perspective). In our applications we have millions of
|
||||||
|
* small objects and wasted memory would become a big problem.
|
||||||
|
*
|
||||||
|
* For example a Serialized Boolean takes 47 bytes which means it will fit into
|
||||||
|
* the 64byte LRU. Using 1 byte means it will fit into the 8 byte LRU thus
|
||||||
|
* saving 8x the memory. This also saves the CPU performance since we don't have
|
||||||
|
* to serialize bytes back and forth and we can compute the byte[] value
|
||||||
|
* directly.
|
||||||
|
*
|
||||||
|
* One problem would be when the user calls get() because doing so would require
|
||||||
|
* the app to know the type of the object stored as a bytearray inside memcached
|
||||||
|
* (since the user will probably cast).
|
||||||
|
*
|
||||||
|
* If we assume the basic types are interned we could use the first byte as the
|
||||||
|
* type with the remaining bytes as the value. Then on get() we could read the
|
||||||
|
* first byte to determine the type and then construct the correct object for
|
||||||
|
* it. This would prevent the ClassCastException I talked about above.
|
||||||
|
*
|
||||||
|
* We could remove the setSerialize() option and just assume that standard VM
|
||||||
|
* types are always internd in this manner.
|
||||||
|
*
|
||||||
|
* mc.set( "foo", new Boolean.TRUE ); Boolean b = (Boolean)mc.get( "foo" );
|
||||||
|
*
|
||||||
|
* And the type casts would work because internally we would create a new
|
||||||
|
* Boolean to return back to the client.
|
||||||
|
*
|
||||||
|
* This would reduce memory footprint and allow for a virtual implementation of
|
||||||
|
* the Externalizable interface which is much faster than Serialzation.
|
||||||
|
*
|
||||||
|
* Currently the memory improvements would be:
|
||||||
|
*
|
||||||
|
* java.lang.Boolean - 8x performance improvement (now just two bytes)
|
||||||
|
* java.lang.Integer - 16x performance improvement (now just 5 bytes)
|
||||||
|
*
|
||||||
|
* Most of the other primitive types would benefit from this optimization.
|
||||||
|
* java.lang.Character being another obvious example.
|
||||||
|
*
|
||||||
|
* I know it seems like I'm being really picky here but for our application I'd
|
||||||
|
* save 1G of memory right off the bat. We'd go down from 1.152G of memory used
|
||||||
|
* down to 144M of memory used which is much better IMO.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class NativeHandler {
|
||||||
|
|
||||||
|
// logger
|
||||||
|
private static Logger log = Logger.getLogger(NativeHandler.class.getName());
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Detemine of object can be natively serialized by this class.
|
||||||
|
*
|
||||||
|
* @param value
|
||||||
|
* Object to test.
|
||||||
|
* @return true/false
|
||||||
|
*/
|
||||||
|
public static boolean isHandled(Object value) {
|
||||||
|
|
||||||
|
return (value instanceof Byte || value instanceof Boolean
|
||||||
|
|| value instanceof Integer || value instanceof Long
|
||||||
|
|| value instanceof Character || value instanceof String
|
||||||
|
|| value instanceof StringBuffer || value instanceof Float
|
||||||
|
|| value instanceof Short || value instanceof Double
|
||||||
|
|| value instanceof Date || value instanceof StringBuilder || value instanceof byte[]) ? true
|
||||||
|
: false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the flag for marking the type of the byte array.
|
||||||
|
*
|
||||||
|
* @param value
|
||||||
|
* Object we are storing.
|
||||||
|
* @return int marker
|
||||||
|
*/
|
||||||
|
public static int getMarkerFlag(Object value) {
|
||||||
|
|
||||||
|
if (value instanceof Byte)
|
||||||
|
return MemcachedClient.MARKER_BYTE;
|
||||||
|
|
||||||
|
if (value instanceof Boolean)
|
||||||
|
return MemcachedClient.MARKER_BOOLEAN;
|
||||||
|
|
||||||
|
if (value instanceof Integer)
|
||||||
|
return MemcachedClient.MARKER_INTEGER;
|
||||||
|
|
||||||
|
if (value instanceof Long)
|
||||||
|
return MemcachedClient.MARKER_LONG;
|
||||||
|
|
||||||
|
if (value instanceof Character)
|
||||||
|
return MemcachedClient.MARKER_CHARACTER;
|
||||||
|
|
||||||
|
if (value instanceof String)
|
||||||
|
return MemcachedClient.MARKER_STRING;
|
||||||
|
|
||||||
|
if (value instanceof StringBuffer)
|
||||||
|
return MemcachedClient.MARKER_STRINGBUFFER;
|
||||||
|
|
||||||
|
if (value instanceof Float)
|
||||||
|
return MemcachedClient.MARKER_FLOAT;
|
||||||
|
|
||||||
|
if (value instanceof Short)
|
||||||
|
return MemcachedClient.MARKER_SHORT;
|
||||||
|
|
||||||
|
if (value instanceof Double)
|
||||||
|
return MemcachedClient.MARKER_DOUBLE;
|
||||||
|
|
||||||
|
if (value instanceof Date)
|
||||||
|
return MemcachedClient.MARKER_DATE;
|
||||||
|
|
||||||
|
if (value instanceof StringBuilder)
|
||||||
|
return MemcachedClient.MARKER_STRINGBUILDER;
|
||||||
|
|
||||||
|
if (value instanceof byte[])
|
||||||
|
return MemcachedClient.MARKER_BYTEARR;
|
||||||
|
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Encodes supported types
|
||||||
|
*
|
||||||
|
* @param value
|
||||||
|
* Object to encode.
|
||||||
|
* @return byte array
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
* If fail to encode.
|
||||||
|
*/
|
||||||
|
public static byte[] encode(Object value) throws Exception {
|
||||||
|
|
||||||
|
if (value instanceof Byte)
|
||||||
|
return encode((Byte) value);
|
||||||
|
|
||||||
|
if (value instanceof Boolean)
|
||||||
|
return encode((Boolean) value);
|
||||||
|
|
||||||
|
if (value instanceof Integer)
|
||||||
|
return encode(((Integer) value).intValue());
|
||||||
|
|
||||||
|
if (value instanceof Long)
|
||||||
|
return encode(((Long) value).longValue());
|
||||||
|
|
||||||
|
if (value instanceof Character)
|
||||||
|
return encode((Character) value);
|
||||||
|
|
||||||
|
if (value instanceof String)
|
||||||
|
return encode((String) value);
|
||||||
|
|
||||||
|
if (value instanceof StringBuffer)
|
||||||
|
return encode((StringBuffer) value);
|
||||||
|
|
||||||
|
if (value instanceof Float)
|
||||||
|
return encode(((Float) value).floatValue());
|
||||||
|
|
||||||
|
if (value instanceof Short)
|
||||||
|
return encode((Short) value);
|
||||||
|
|
||||||
|
if (value instanceof Double)
|
||||||
|
return encode(((Double) value).doubleValue());
|
||||||
|
|
||||||
|
if (value instanceof Date)
|
||||||
|
return encode((Date) value);
|
||||||
|
|
||||||
|
if (value instanceof StringBuilder)
|
||||||
|
return encode((StringBuilder) value);
|
||||||
|
|
||||||
|
if (value instanceof byte[])
|
||||||
|
return encode((byte[]) value);
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static byte[] encode(Byte value) {
|
||||||
|
byte[] b = new byte[1];
|
||||||
|
b[0] = value.byteValue();
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static byte[] encode(Boolean value) {
|
||||||
|
byte[] b = new byte[1];
|
||||||
|
|
||||||
|
if (value.booleanValue())
|
||||||
|
b[0] = 1;
|
||||||
|
else
|
||||||
|
b[0] = 0;
|
||||||
|
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static byte[] encode(int value) {
|
||||||
|
return getBytes(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static byte[] encode(long value) throws Exception {
|
||||||
|
return getBytes(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static byte[] encode(Date value) {
|
||||||
|
return getBytes(value.getTime());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static byte[] encode(Character value) {
|
||||||
|
return encode(value.charValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static byte[] encode(String value) throws Exception {
|
||||||
|
return value.getBytes("UTF-8");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static byte[] encode(StringBuffer value) throws Exception {
|
||||||
|
return encode(value.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static byte[] encode(float value) throws Exception {
|
||||||
|
return encode((int) Float.floatToIntBits(value));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static byte[] encode(Short value) throws Exception {
|
||||||
|
return encode((int) value.shortValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static byte[] encode(double value) throws Exception {
|
||||||
|
return encode((long) Double.doubleToLongBits(value));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static byte[] encode(StringBuilder value) throws Exception {
|
||||||
|
return encode(value.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static byte[] encode(byte[] value) {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static byte[] getBytes(long value) {
|
||||||
|
byte[] b = new byte[8];
|
||||||
|
b[0] = (byte) ((value >> 56) & 0xFF);
|
||||||
|
b[1] = (byte) ((value >> 48) & 0xFF);
|
||||||
|
b[2] = (byte) ((value >> 40) & 0xFF);
|
||||||
|
b[3] = (byte) ((value >> 32) & 0xFF);
|
||||||
|
b[4] = (byte) ((value >> 24) & 0xFF);
|
||||||
|
b[5] = (byte) ((value >> 16) & 0xFF);
|
||||||
|
b[6] = (byte) ((value >> 8) & 0xFF);
|
||||||
|
b[7] = (byte) ((value >> 0) & 0xFF);
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static byte[] getBytes(int value) {
|
||||||
|
byte[] b = new byte[4];
|
||||||
|
b[0] = (byte) ((value >> 24) & 0xFF);
|
||||||
|
b[1] = (byte) ((value >> 16) & 0xFF);
|
||||||
|
b[2] = (byte) ((value >> 8) & 0xFF);
|
||||||
|
b[3] = (byte) ((value >> 0) & 0xFF);
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decodes byte array using memcache flag to determine type.
|
||||||
|
*
|
||||||
|
* @param b
|
||||||
|
* @param marker
|
||||||
|
* @return
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public static Object decode(byte[] b, int flag) throws Exception {
|
||||||
|
|
||||||
|
if (b.length < 1)
|
||||||
|
return null;
|
||||||
|
|
||||||
|
if ((flag & MemcachedClient.MARKER_BYTE) == MemcachedClient.MARKER_BYTE)
|
||||||
|
return decodeByte(b);
|
||||||
|
|
||||||
|
if ((flag & MemcachedClient.MARKER_BOOLEAN) == MemcachedClient.MARKER_BOOLEAN)
|
||||||
|
return decodeBoolean(b);
|
||||||
|
|
||||||
|
if ((flag & MemcachedClient.MARKER_INTEGER) == MemcachedClient.MARKER_INTEGER)
|
||||||
|
return decodeInteger(b);
|
||||||
|
|
||||||
|
if ((flag & MemcachedClient.MARKER_LONG) == MemcachedClient.MARKER_LONG)
|
||||||
|
return decodeLong(b);
|
||||||
|
|
||||||
|
if ((flag & MemcachedClient.MARKER_CHARACTER) == MemcachedClient.MARKER_CHARACTER)
|
||||||
|
return decodeCharacter(b);
|
||||||
|
|
||||||
|
if ((flag & MemcachedClient.MARKER_STRING) == MemcachedClient.MARKER_STRING)
|
||||||
|
return decodeString(b);
|
||||||
|
|
||||||
|
if ((flag & MemcachedClient.MARKER_STRINGBUFFER) == MemcachedClient.MARKER_STRINGBUFFER)
|
||||||
|
return decodeStringBuffer(b);
|
||||||
|
|
||||||
|
if ((flag & MemcachedClient.MARKER_FLOAT) == MemcachedClient.MARKER_FLOAT)
|
||||||
|
return decodeFloat(b);
|
||||||
|
|
||||||
|
if ((flag & MemcachedClient.MARKER_SHORT) == MemcachedClient.MARKER_SHORT)
|
||||||
|
return decodeShort(b);
|
||||||
|
|
||||||
|
if ((flag & MemcachedClient.MARKER_DOUBLE) == MemcachedClient.MARKER_DOUBLE)
|
||||||
|
return decodeDouble(b);
|
||||||
|
|
||||||
|
if ((flag & MemcachedClient.MARKER_DATE) == MemcachedClient.MARKER_DATE)
|
||||||
|
return decodeDate(b);
|
||||||
|
|
||||||
|
if ((flag & MemcachedClient.MARKER_STRINGBUILDER) == MemcachedClient.MARKER_STRINGBUILDER)
|
||||||
|
return decodeStringBuilder(b);
|
||||||
|
|
||||||
|
if ((flag & MemcachedClient.MARKER_BYTEARR) == MemcachedClient.MARKER_BYTEARR)
|
||||||
|
return decodeByteArr(b);
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// decode methods
|
||||||
|
protected static Byte decodeByte(byte[] b) {
|
||||||
|
return new Byte(b[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static Boolean decodeBoolean(byte[] b) {
|
||||||
|
boolean value = b[0] == 1;
|
||||||
|
return (value) ? Boolean.TRUE : Boolean.FALSE;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static Integer decodeInteger(byte[] b) {
|
||||||
|
return new Integer(toInt(b));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static Long decodeLong(byte[] b) throws Exception {
|
||||||
|
return new Long(toLong(b));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static Character decodeCharacter(byte[] b) {
|
||||||
|
return new Character((char) decodeInteger(b).intValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static String decodeString(byte[] b) throws Exception {
|
||||||
|
return new String(b, "UTF-8");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static StringBuffer decodeStringBuffer(byte[] b) throws Exception {
|
||||||
|
return new StringBuffer(decodeString(b));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static Float decodeFloat(byte[] b) throws Exception {
|
||||||
|
Integer l = decodeInteger(b);
|
||||||
|
return new Float(Float.intBitsToFloat(l.intValue()));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static Short decodeShort(byte[] b) throws Exception {
|
||||||
|
return new Short((short) decodeInteger(b).intValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static Double decodeDouble(byte[] b) throws Exception {
|
||||||
|
Long l = decodeLong(b);
|
||||||
|
return new Double(Double.longBitsToDouble(l.longValue()));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static Date decodeDate(byte[] b) {
|
||||||
|
return new Date(toLong(b));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static StringBuilder decodeStringBuilder(byte[] b)
|
||||||
|
throws Exception {
|
||||||
|
return new StringBuilder(decodeString(b));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static byte[] decodeByteArr(byte[] b) {
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This works by taking each of the bit patterns and converting them to ints
|
||||||
|
* taking into account 2s complement and then adding them..
|
||||||
|
*
|
||||||
|
* @param b
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
protected static int toInt(byte[] b) {
|
||||||
|
return (((((int) b[3]) & 0xFF) << 32) + ((((int) b[2]) & 0xFF) << 40)
|
||||||
|
+ ((((int) b[1]) & 0xFF) << 48) + ((((int) b[0]) & 0xFF) << 56));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static long toLong(byte[] b) {
|
||||||
|
return ((((long) b[7]) & 0xFF) + ((((long) b[6]) & 0xFF) << 8)
|
||||||
|
+ ((((long) b[5]) & 0xFF) << 16)
|
||||||
|
+ ((((long) b[4]) & 0xFF) << 24)
|
||||||
|
+ ((((long) b[3]) & 0xFF) << 32)
|
||||||
|
+ ((((long) b[2]) & 0xFF) << 40)
|
||||||
|
+ ((((long) b[1]) & 0xFF) << 48) + ((((long) b[0]) & 0xFF) << 56));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,27 @@
|
||||||
|
package cn.edu.buaa.act.memcachedClient;
|
||||||
|
|
||||||
|
import java.io.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Bridge class to provide nested Exceptions with IOException which has
|
||||||
|
* constructors that don't take Throwables.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class NestedIOException extends IOException {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new <code>NestedIOException</code> instance.
|
||||||
|
*
|
||||||
|
* @param cause
|
||||||
|
* object of type throwable
|
||||||
|
*/
|
||||||
|
public NestedIOException(Throwable cause) {
|
||||||
|
super(cause.getMessage());
|
||||||
|
super.initCause(cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
public NestedIOException(String message, Throwable cause) {
|
||||||
|
super(message);
|
||||||
|
initCause(cause);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,432 @@
|
||||||
|
package memcached;
|
||||||
|
|
||||||
|
import java.io.BufferedInputStream;
|
||||||
|
import java.io.BufferedOutputStream;
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.Socket;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
|
import java.nio.channels.SocketChannel;
|
||||||
|
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MemCached client for Java, utility class for Socket IO.
|
||||||
|
*
|
||||||
|
* This class is a wrapper around a Socket and its streams.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class SockIO implements LineInputStream {
|
||||||
|
// logger
|
||||||
|
private static Logger log = Logger.getLogger(SockIO.class.getName());
|
||||||
|
|
||||||
|
// data
|
||||||
|
private String host;
|
||||||
|
private Socket sock;
|
||||||
|
|
||||||
|
private DataInputStream in;
|
||||||
|
private BufferedOutputStream out;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* creates a new SockIO object wrapping a socket connection to host:port,
|
||||||
|
* and its input and output streams
|
||||||
|
*
|
||||||
|
* @param pool
|
||||||
|
* Pool this object is tied to
|
||||||
|
* @param host
|
||||||
|
* host to connect to
|
||||||
|
* @param port
|
||||||
|
* port to connect to
|
||||||
|
* @param timeout
|
||||||
|
* int ms to block on data for read
|
||||||
|
* @param connectTimeout
|
||||||
|
* timeout (in ms) for initial connection
|
||||||
|
* @param noDelay
|
||||||
|
* TCP NODELAY option?
|
||||||
|
* @throws IOException
|
||||||
|
* if an io error occurrs when creating socket
|
||||||
|
* @throws UnknownHostException
|
||||||
|
* if hostname is invalid
|
||||||
|
*/
|
||||||
|
public SockIO(String host, int port, int timeout, int connectTimeout,
|
||||||
|
boolean noDelay) throws IOException, UnknownHostException {
|
||||||
|
// get a socket channel
|
||||||
|
sock = getSocket(host, port, connectTimeout);
|
||||||
|
|
||||||
|
if (timeout >= 0)
|
||||||
|
sock.setSoTimeout(timeout);
|
||||||
|
|
||||||
|
// testing only
|
||||||
|
sock.setTcpNoDelay(noDelay);
|
||||||
|
|
||||||
|
// wrap streams
|
||||||
|
in = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
|
||||||
|
out = new BufferedOutputStream(sock.getOutputStream());
|
||||||
|
|
||||||
|
this.host = host + ":" + port;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* creates a new SockIO object wrapping a socket connection to host:port,
|
||||||
|
* and its input and output streams
|
||||||
|
*
|
||||||
|
* @param host
|
||||||
|
* hostname:port
|
||||||
|
* @param timeout
|
||||||
|
* read timeout value for connected socket
|
||||||
|
* @param connectTimeout
|
||||||
|
* timeout for initial connections
|
||||||
|
* @param noDelay
|
||||||
|
* TCP NODELAY option?
|
||||||
|
* @throws IOException
|
||||||
|
* if an io error occurrs when creating socket
|
||||||
|
* @throws UnknownHostException
|
||||||
|
* if hostname is invalid
|
||||||
|
*/
|
||||||
|
public SockIO(String host, int timeout, int connectTimeout, boolean noDelay)
|
||||||
|
throws IOException, UnknownHostException {
|
||||||
|
String[] ip = host.split(":");
|
||||||
|
|
||||||
|
// get socket: default is to use non-blocking connect
|
||||||
|
sock = getSocket(ip[0], Integer.parseInt(ip[1]), connectTimeout);
|
||||||
|
|
||||||
|
if (timeout >= 0)
|
||||||
|
this.sock.setSoTimeout(timeout);
|
||||||
|
|
||||||
|
// testing only
|
||||||
|
sock.setTcpNoDelay(noDelay);
|
||||||
|
|
||||||
|
// wrap streams
|
||||||
|
in = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
|
||||||
|
out = new BufferedOutputStream(sock.getOutputStream());
|
||||||
|
|
||||||
|
this.host = host;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method which gets a connection from SocketChannel.
|
||||||
|
*
|
||||||
|
* @param host
|
||||||
|
* host to establish connection to
|
||||||
|
* @param port
|
||||||
|
* port on that host
|
||||||
|
* @param timeout
|
||||||
|
* connection timeout in ms
|
||||||
|
*
|
||||||
|
* @return connected socket
|
||||||
|
* @throws IOException
|
||||||
|
* if errors connecting or if connection times out
|
||||||
|
*/
|
||||||
|
protected static Socket getSocket(String host, int port, int timeout)
|
||||||
|
throws IOException {
|
||||||
|
SocketChannel sock = SocketChannel.open();
|
||||||
|
sock.socket().connect(new InetSocketAddress(host, port), timeout);
|
||||||
|
return sock.socket();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lets caller get access to underlying channel.
|
||||||
|
*
|
||||||
|
* @return the backing SocketChannel
|
||||||
|
*/
|
||||||
|
public SocketChannel getChannel() {
|
||||||
|
return sock.getChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* returns the host this socket is connected to
|
||||||
|
*
|
||||||
|
* @return String representation of host (hostname:port)
|
||||||
|
*/
|
||||||
|
public String getHost() {
|
||||||
|
return this.host;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* closes socket and all streams connected to it
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* if fails to close streams or socket
|
||||||
|
*/
|
||||||
|
public void trueClose() throws IOException {
|
||||||
|
trueClose(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* closes socket and all streams connected to it
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* if fails to close streams or socket
|
||||||
|
*/
|
||||||
|
public void trueClose(boolean addToDeadPool) throws IOException {
|
||||||
|
if (log.isDebugEnabled())
|
||||||
|
log.debug("++++ Closing socket for real: " + toString());
|
||||||
|
|
||||||
|
boolean err = false;
|
||||||
|
StringBuilder errMsg = new StringBuilder();
|
||||||
|
|
||||||
|
if (in != null) {
|
||||||
|
try {
|
||||||
|
in.close();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
log.error("++++ error closing input stream for socket: "
|
||||||
|
+ toString() + " for host: " + getHost());
|
||||||
|
log.error(ioe.getMessage(), ioe);
|
||||||
|
errMsg.append("++++ error closing input stream for socket: "
|
||||||
|
+ toString() + " for host: " + getHost() + "\n");
|
||||||
|
errMsg.append(ioe.getMessage());
|
||||||
|
err = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (out != null) {
|
||||||
|
try {
|
||||||
|
out.close();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
log.error("++++ error closing output stream for socket: "
|
||||||
|
+ toString() + " for host: " + getHost());
|
||||||
|
log.error(ioe.getMessage(), ioe);
|
||||||
|
errMsg.append("++++ error closing output stream for socket: "
|
||||||
|
+ toString() + " for host: " + getHost() + "\n");
|
||||||
|
errMsg.append(ioe.getMessage());
|
||||||
|
err = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sock != null) {
|
||||||
|
try {
|
||||||
|
sock.close();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
log.error("++++ error closing socket: " + toString()
|
||||||
|
+ " for host: " + getHost());
|
||||||
|
log.error(ioe.getMessage(), ioe);
|
||||||
|
errMsg.append("++++ error closing socket: " + toString()
|
||||||
|
+ " for host: " + getHost() + "\n");
|
||||||
|
errMsg.append(ioe.getMessage());
|
||||||
|
err = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check in to pool
|
||||||
|
if (addToDeadPool && sock != null) {
|
||||||
|
// check in to pool pool.checkIn( this, false );
|
||||||
|
System.out.println("check in to pool ");
|
||||||
|
}
|
||||||
|
|
||||||
|
in = null;
|
||||||
|
out = null;
|
||||||
|
sock = null;
|
||||||
|
|
||||||
|
if (err)
|
||||||
|
throw new IOException(errMsg.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* sets closed flag and checks in to connection pool but does not close
|
||||||
|
* connections
|
||||||
|
*/
|
||||||
|
void close() {
|
||||||
|
// check in to pool
|
||||||
|
if (log.isDebugEnabled())
|
||||||
|
log.debug("++++ marking socket (" + this.toString()
|
||||||
|
+ ") as closed and available to return to avail pool");
|
||||||
|
// pool.checkIn( this );
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* checks if the connection is open
|
||||||
|
*
|
||||||
|
* @return true if connected
|
||||||
|
*/
|
||||||
|
boolean isConnected() {
|
||||||
|
return (sock != null && sock.isConnected());
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* checks to see that the connection is still working
|
||||||
|
*
|
||||||
|
* @return true if still alive
|
||||||
|
*/
|
||||||
|
boolean isAlive() {
|
||||||
|
|
||||||
|
if (!isConnected())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
// try to talk to the server w/ a dumb query to ask its version
|
||||||
|
try {
|
||||||
|
this.write("version\r\n".getBytes());
|
||||||
|
this.flush();
|
||||||
|
this.readLine();
|
||||||
|
} catch (IOException ex) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* reads a line intentionally not using the deprecated readLine method from
|
||||||
|
* DataInputStream
|
||||||
|
*
|
||||||
|
* @return String that was read in
|
||||||
|
* @throws IOException
|
||||||
|
* if io problems during read
|
||||||
|
*/
|
||||||
|
public String readLine() throws IOException {
|
||||||
|
if (sock == null || !sock.isConnected()) {
|
||||||
|
log.error("++++ attempting to read from closed socket");
|
||||||
|
throw new IOException("++++ attempting to read from closed socket");
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] b = new byte[1];
|
||||||
|
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||||
|
boolean eol = false;
|
||||||
|
|
||||||
|
while (in.read(b, 0, 1) != -1) {
|
||||||
|
|
||||||
|
if (b[0] == 13) {
|
||||||
|
eol = true;
|
||||||
|
} else {
|
||||||
|
if (eol) {
|
||||||
|
if (b[0] == 10)
|
||||||
|
break;
|
||||||
|
|
||||||
|
eol = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// cast byte into char array
|
||||||
|
bos.write(b, 0, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (bos == null || bos.size() <= 0) {
|
||||||
|
throw new IOException(
|
||||||
|
"++++ Stream appears to be dead, so closing it down");
|
||||||
|
}
|
||||||
|
|
||||||
|
// else return the string
|
||||||
|
return bos.toString().trim();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* reads up to end of line and returns nothing
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* if io problems during read
|
||||||
|
*/
|
||||||
|
public void clearEOL() throws IOException {
|
||||||
|
if (sock == null || !sock.isConnected()) {
|
||||||
|
log.error("++++ attempting to read from closed socket");
|
||||||
|
throw new IOException("++++ attempting to read from closed socket");
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] b = new byte[1];
|
||||||
|
boolean eol = false;
|
||||||
|
while (in.read(b, 0, 1) != -1) {
|
||||||
|
|
||||||
|
// only stop when we see
|
||||||
|
// \r (13) followed by \n (10)
|
||||||
|
if (b[0] == 13) {
|
||||||
|
eol = true;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (eol) {
|
||||||
|
if (b[0] == 10)
|
||||||
|
break;
|
||||||
|
|
||||||
|
eol = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* reads length bytes into the passed in byte array from dtream
|
||||||
|
*
|
||||||
|
* @param b
|
||||||
|
* byte array
|
||||||
|
* @throws IOException
|
||||||
|
* if io problems during read
|
||||||
|
*/
|
||||||
|
public int read(byte[] b) throws IOException {
|
||||||
|
if (sock == null || !sock.isConnected()) {
|
||||||
|
log.error("++++ attempting to read from closed socket");
|
||||||
|
throw new IOException("++++ attempting to read from closed socket");
|
||||||
|
}
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
while (count < b.length) {
|
||||||
|
int cnt = in.read(b, count, (b.length - count));
|
||||||
|
count += cnt;
|
||||||
|
}
|
||||||
|
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* flushes output stream
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* if io problems during read
|
||||||
|
*/
|
||||||
|
void flush() throws IOException {
|
||||||
|
if (sock == null || !sock.isConnected()) {
|
||||||
|
log.error("++++ attempting to write to closed socket");
|
||||||
|
throw new IOException("++++ attempting to write to closed socket");
|
||||||
|
}
|
||||||
|
out.flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* writes a byte array to the output stream
|
||||||
|
*
|
||||||
|
* @param b
|
||||||
|
* byte array to write
|
||||||
|
* @throws IOException
|
||||||
|
* if an io error happens
|
||||||
|
*/
|
||||||
|
void write(byte[] b) throws IOException {
|
||||||
|
if (sock == null || !sock.isConnected()) {
|
||||||
|
log.error("++++ attempting to write to closed socket");
|
||||||
|
throw new IOException("++++ attempting to write to closed socket");
|
||||||
|
}
|
||||||
|
out.write(b);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* use the sockets hashcode for this object so we can key off of SockIOs
|
||||||
|
*
|
||||||
|
* @return int hashcode
|
||||||
|
*/
|
||||||
|
public int hashCode() {
|
||||||
|
return (sock == null) ? 0 : sock.hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* returns the string representation of this socket
|
||||||
|
*
|
||||||
|
* @return string
|
||||||
|
*/
|
||||||
|
public String toString() {
|
||||||
|
return (sock == null) ? "" : sock.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hack to reap any leaking children.
|
||||||
|
*/
|
||||||
|
protected void finalize() throws Throwable {
|
||||||
|
try {
|
||||||
|
if (sock != null) {
|
||||||
|
log.error("++++ closing potentially leaked socket in finalize");
|
||||||
|
sock.close();
|
||||||
|
sock = null;
|
||||||
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
log.error(t.getMessage(), t);
|
||||||
|
} finally {
|
||||||
|
super.finalize();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,9 @@
|
||||||
|
package server;
|
||||||
|
|
||||||
|
public class ClientConfig
|
||||||
|
{
|
||||||
|
public int id;
|
||||||
|
public String host;
|
||||||
|
public int client_port;
|
||||||
|
public String memcached;
|
||||||
|
}
|
|
@ -0,0 +1,19 @@
|
||||||
|
package server;
|
||||||
|
|
||||||
|
public class LockKey {
|
||||||
|
public Integer memNumber = 0;
|
||||||
|
public Integer ncount = 0;
|
||||||
|
public Integer state = unLock;
|
||||||
|
public long time;
|
||||||
|
|
||||||
|
LockKey(Integer num, Integer count, long t, Integer s) {
|
||||||
|
memNumber = num;
|
||||||
|
ncount = count;
|
||||||
|
time = t;
|
||||||
|
state = s;
|
||||||
|
}
|
||||||
|
|
||||||
|
public final static Integer unLock = 0;
|
||||||
|
public final static Integer badLock = 1;
|
||||||
|
public final static Integer waitLock = 2;
|
||||||
|
}
|
|
@ -0,0 +1,33 @@
|
||||||
|
package server;
|
||||||
|
|
||||||
|
import org.jboss.netty.buffer.ChannelBuffer;
|
||||||
|
import org.jboss.netty.channel.Channel;
|
||||||
|
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||||
|
import org.jboss.netty.handler.codec.frame.FrameDecoder;
|
||||||
|
|
||||||
|
import common.EMSGID;
|
||||||
|
|
||||||
|
public class MDecoder extends FrameDecoder
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
|
||||||
|
if (buffer.readableBytes() < 4)
|
||||||
|
{
|
||||||
|
return null;//(1)
|
||||||
|
}
|
||||||
|
int dataLength = buffer.getInt(buffer.readerIndex());
|
||||||
|
if (buffer.readableBytes() < dataLength + 4)
|
||||||
|
{
|
||||||
|
return null;//(2)
|
||||||
|
}
|
||||||
|
|
||||||
|
buffer.skipBytes(4);//(3)
|
||||||
|
int id = buffer.readInt();
|
||||||
|
byte[] decoded = new byte[dataLength-4];
|
||||||
|
|
||||||
|
buffer.readBytes(decoded);
|
||||||
|
NetMsg msg = new NetMsg(decoded, id);//(4)
|
||||||
|
msg.setMsgID(EMSGID.values()[id]);
|
||||||
|
return msg;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,27 @@
|
||||||
|
package server;
|
||||||
|
|
||||||
|
import org.jboss.netty.buffer.ChannelBuffer;
|
||||||
|
import org.jboss.netty.buffer.ChannelBuffers;
|
||||||
|
import org.jboss.netty.channel.Channel;
|
||||||
|
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||||
|
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
|
||||||
|
|
||||||
|
public class MEncoder extends OneToOneEncoder
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception
|
||||||
|
{
|
||||||
|
if (!(msg instanceof NetMsg))
|
||||||
|
{
|
||||||
|
return msg;//(1)
|
||||||
|
}
|
||||||
|
NetMsg res = (NetMsg)msg;
|
||||||
|
byte[] data = res.getBytes();
|
||||||
|
int dataLength = data.length+4;
|
||||||
|
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();//(2)
|
||||||
|
buf.writeInt(dataLength);
|
||||||
|
buf.writeInt(res.msgID.ordinal());
|
||||||
|
buf.writeBytes(data);
|
||||||
|
return buf;//(3)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,35 @@
|
||||||
|
package server;
|
||||||
|
|
||||||
|
import org.jboss.netty.channel.Channel;
|
||||||
|
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||||
|
import org.jboss.netty.channel.ChannelStateEvent;
|
||||||
|
import org.jboss.netty.channel.ExceptionEvent;
|
||||||
|
import org.jboss.netty.channel.MessageEvent;
|
||||||
|
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||||
|
|
||||||
|
|
||||||
|
public class MServerHandler extends SimpleChannelUpstreamHandler
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
|
||||||
|
{
|
||||||
|
if (!(e.getMessage() instanceof NetMsg))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
memSession.getInstance().addSession(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
|
||||||
|
{
|
||||||
|
Channel channel = e.getChannel();
|
||||||
|
channel.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)throws Exception
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,18 @@
|
||||||
|
package server;
|
||||||
|
|
||||||
|
import org.jboss.netty.channel.ChannelPipeline;
|
||||||
|
import org.jboss.netty.channel.ChannelPipelineFactory;
|
||||||
|
import org.jboss.netty.channel.Channels;
|
||||||
|
|
||||||
|
public class MServerPipelineFactory implements ChannelPipelineFactory
|
||||||
|
{
|
||||||
|
public ChannelPipeline getPipeline() throws Exception
|
||||||
|
{
|
||||||
|
ChannelPipeline pipeline = Channels.pipeline();
|
||||||
|
|
||||||
|
pipeline.addLast("decoder", new MDecoder());
|
||||||
|
pipeline.addLast("encoder", new MEncoder());
|
||||||
|
pipeline.addLast("handler", new MServerHandler());
|
||||||
|
return pipeline;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,143 @@
|
||||||
|
package server;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.Scanner;
|
||||||
|
|
||||||
|
import javax.xml.parsers.DocumentBuilder;
|
||||||
|
import javax.xml.parsers.DocumentBuilderFactory;
|
||||||
|
import javax.xml.parsers.ParserConfigurationException;
|
||||||
|
|
||||||
|
import org.w3c.dom.Document;
|
||||||
|
import org.w3c.dom.Element;
|
||||||
|
import org.w3c.dom.Node;
|
||||||
|
import org.w3c.dom.NodeList;
|
||||||
|
import org.xml.sax.SAXException;
|
||||||
|
|
||||||
|
import client.Client;
|
||||||
|
import client.ClientMgr;
|
||||||
|
|
||||||
|
import common.RegisterHandler;
|
||||||
|
|
||||||
|
public class MemcachedMain {
|
||||||
|
HashMap<Integer, ClientConfig> m_mapMemcachedClient;
|
||||||
|
String webServerHost;
|
||||||
|
String protocolName;
|
||||||
|
|
||||||
|
public boolean initConfig() {
|
||||||
|
m_mapMemcachedClient = new HashMap<Integer, ClientConfig>();
|
||||||
|
|
||||||
|
File f = new File(System.getProperty("user.dir"));
|
||||||
|
String path = f.getPath() + File.separator + "bin" + File.separator;
|
||||||
|
readClientsXML(path + "client.xml");
|
||||||
|
try {
|
||||||
|
Properties properties = new Properties();
|
||||||
|
properties.load(new FileInputStream(path+"config.properties"));
|
||||||
|
webServerHost = properties.getProperty("webServerHost").toString();
|
||||||
|
ClientMgr.nCopyNode = Integer.parseInt(properties.getProperty("replicasNum"));
|
||||||
|
protocolName = properties.getProperty("consistencyProtocol").toString();
|
||||||
|
if(protocolName.equals("twoPhaseCommit")){
|
||||||
|
ClientMgr.protocol = ClientMgr.twoPhaseCommit;
|
||||||
|
}else if(protocolName.equals("paxos")){
|
||||||
|
ClientMgr.protocol = ClientMgr.paxos;
|
||||||
|
}else if(protocolName.equals("weak")){
|
||||||
|
ClientMgr.protocol = ClientMgr.weak;
|
||||||
|
}else{
|
||||||
|
System.err.print("consistency protocol input error");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (IOException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMemcachedNumber() {
|
||||||
|
System.out.print("Please in put R-Memcached ID:");
|
||||||
|
@SuppressWarnings("resource")
|
||||||
|
Scanner scanner = new Scanner(System.in);
|
||||||
|
return Integer.decode(scanner.next());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void start() {
|
||||||
|
initConfig();
|
||||||
|
int num = getMemcachedNumber();
|
||||||
|
|
||||||
|
RegisterHandler.initHandler();
|
||||||
|
memSession.getInstance().start(m_mapMemcachedClient.get(num).memcached);
|
||||||
|
|
||||||
|
ClientMgr clientMgr = ClientMgr.getInstance();
|
||||||
|
Server server = Server.getInstance();
|
||||||
|
server.init(m_mapMemcachedClient.get(num).client_port);
|
||||||
|
clientMgr.init(num, m_mapMemcachedClient);
|
||||||
|
|
||||||
|
Client webClient = new Client();
|
||||||
|
webClient.init(webServerHost, 8888);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
MemcachedMain entrance = new MemcachedMain();
|
||||||
|
entrance.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ¶ÁÈ¡memcached clientÅäÖÃ
|
||||||
|
public boolean readClientsXML(String str) {
|
||||||
|
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
|
||||||
|
try {
|
||||||
|
factory.setIgnoringElementContentWhitespace(true);
|
||||||
|
|
||||||
|
DocumentBuilder db = factory.newDocumentBuilder();
|
||||||
|
Document xmldoc = db.parse(new File(str));
|
||||||
|
Element elmtInfo = xmldoc.getDocumentElement();
|
||||||
|
NodeList nodes = elmtInfo.getChildNodes();
|
||||||
|
for (int i = 0; i < nodes.getLength(); i++) {
|
||||||
|
Node result = nodes.item(i);
|
||||||
|
if (result.getNodeType() == Node.ELEMENT_NODE
|
||||||
|
&& result.getNodeName().equals("client")) {
|
||||||
|
NodeList ns = result.getChildNodes();
|
||||||
|
ClientConfig localClient = new ClientConfig();
|
||||||
|
int m = 0;
|
||||||
|
for (int j = 0; j < ns.getLength(); j++) {
|
||||||
|
Node record = ns.item(j);
|
||||||
|
if (record.getNodeType() == Node.ELEMENT_NODE) {
|
||||||
|
if (record.getNodeName().equals("id")) {
|
||||||
|
m++;
|
||||||
|
localClient.id = Integer.decode(record
|
||||||
|
.getTextContent());
|
||||||
|
} else if (record.getNodeName().equals("host")) {
|
||||||
|
m++;
|
||||||
|
localClient.host = record.getTextContent();
|
||||||
|
} else if (record.getNodeName().equals(
|
||||||
|
"client_port")) {
|
||||||
|
m++;
|
||||||
|
localClient.client_port = Integer.decode(record
|
||||||
|
.getTextContent());
|
||||||
|
} else if (record.getNodeName().equals("memcached")) {
|
||||||
|
m++;
|
||||||
|
localClient.memcached = record.getTextContent();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (m == 4) {
|
||||||
|
m_mapMemcachedClient.put(localClient.id, localClient);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (ParserConfigurationException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (SAXException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,51 @@
|
||||||
|
package server;
|
||||||
|
import com.google.protobuf.GeneratedMessage;
|
||||||
|
import com.google.protobuf.MessageLite;
|
||||||
|
|
||||||
|
import common.EMSGID;
|
||||||
|
import common.MessageManager;
|
||||||
|
|
||||||
|
public class NetMsg // package different messages
|
||||||
|
{
|
||||||
|
EMSGID msgID;
|
||||||
|
MessageLite messageLite;
|
||||||
|
|
||||||
|
private NetMsg(){};
|
||||||
|
public static NetMsg newMessage()
|
||||||
|
{
|
||||||
|
return new NetMsg();
|
||||||
|
}
|
||||||
|
|
||||||
|
NetMsg(byte[] decoded, int id) throws Exception
|
||||||
|
{
|
||||||
|
messageLite = MessageManager.getMessage(id, decoded);
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] getBytes() //get data in messageLite
|
||||||
|
{
|
||||||
|
return messageLite.toByteArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
public EMSGID getMsgID() //get message catagory
|
||||||
|
{
|
||||||
|
return msgID;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMsgID(EMSGID id) { //according EMSGID.java set the message ID
|
||||||
|
this.msgID = id;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public <T extends MessageLite> T getMessageLite() //get messageLite
|
||||||
|
{
|
||||||
|
return (T)messageLite;
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
public void setMessageLite(GeneratedMessage.Builder builder)
|
||||||
|
{
|
||||||
|
this.messageLite = builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,45 @@
|
||||||
|
package server;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
|
import org.jboss.netty.bootstrap.ServerBootstrap;
|
||||||
|
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
|
||||||
|
|
||||||
|
public class Server
|
||||||
|
{
|
||||||
|
static Server server;
|
||||||
|
ServerBootstrap bootstrap;
|
||||||
|
|
||||||
|
public void init(int port)
|
||||||
|
{
|
||||||
|
bootstrap = new ServerBootstrap(
|
||||||
|
new NioServerSocketChannelFactory(
|
||||||
|
Executors.newCachedThreadPool(),
|
||||||
|
Executors.newCachedThreadPool()));
|
||||||
|
|
||||||
|
// Set up the default event pipeline.
|
||||||
|
bootstrap.setPipelineFactory(new MServerPipelineFactory());
|
||||||
|
bootstrap.setOption("child.tcpNoDelay", true);
|
||||||
|
bootstrap.setOption("child.keepAlive", true);
|
||||||
|
bootstrap.setOption("reuseAddress", true);
|
||||||
|
// Bind and start to accept incoming connections.
|
||||||
|
bootstrap.bind(new InetSocketAddress(port));
|
||||||
|
|
||||||
|
System.out.println("server init complete");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop()
|
||||||
|
{
|
||||||
|
bootstrap.releaseExternalResources();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Server getInstance()
|
||||||
|
{
|
||||||
|
if (server == null)
|
||||||
|
{
|
||||||
|
server = new Server();
|
||||||
|
}
|
||||||
|
return server;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,507 @@
|
||||||
|
package server;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
|
||||||
|
import messageBody.memcachedmsg.nm_Connected;
|
||||||
|
import messageBody.memcachedmsg.nm_Connected_mem_back;
|
||||||
|
import messageBody.memcachedmsg.nm_read;
|
||||||
|
import messageBody.memcachedmsg.nm_read_recovery;
|
||||||
|
import messageBody.memcachedmsg.nm_write_1;
|
||||||
|
import messageBody.memcachedmsg.nm_write_1_res;
|
||||||
|
import messageBody.memcachedmsg.nm_write_2;
|
||||||
|
import messageBody.requestMsg.nr_Connected_mem_back;
|
||||||
|
import messageBody.requestMsg.nr_Read;
|
||||||
|
import messageBody.requestMsg.nr_Read_res;
|
||||||
|
import messageBody.requestMsg.nr_write;
|
||||||
|
import messageBody.requestMsg.nr_write_res;
|
||||||
|
|
||||||
|
import org.jboss.netty.channel.Channel;
|
||||||
|
import org.jboss.netty.channel.MessageEvent;
|
||||||
|
import org.jboss.netty.util.internal.ConcurrentHashMap;
|
||||||
|
|
||||||
|
import memcached.MemcachedClient;
|
||||||
|
import client.ClientMgr;
|
||||||
|
|
||||||
|
import common.EMSGID;
|
||||||
|
|
||||||
|
public class memSession implements Runnable {
|
||||||
|
ConcurrentLinkedQueue<MessageEvent> recvQueue = new ConcurrentLinkedQueue<MessageEvent>();
|
||||||
|
ConcurrentHashMap<Integer, Channel> ClientChannelMap = new ConcurrentHashMap<Integer, Channel>();
|
||||||
|
ConcurrentHashMap<String, LockKey> LockKeyMap = new ConcurrentHashMap<String, LockKey>();
|
||||||
|
public MemcachedClient client;
|
||||||
|
static memSession session = null;
|
||||||
|
|
||||||
|
Channel webServeChannel = null;
|
||||||
|
|
||||||
|
public static memSession getInstance() {
|
||||||
|
if (session == null) {
|
||||||
|
session = new memSession();
|
||||||
|
}
|
||||||
|
return session;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void start(String host) {
|
||||||
|
client = new MemcachedClient(host);
|
||||||
|
new Thread(session).start();
|
||||||
|
System.out.println("session start");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ôö¼ÓclientÁ¬½Ó
|
||||||
|
public void addClientChannel(Integer num, Channel ch) {
|
||||||
|
ClientChannelMap.put(num, ch);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Channel getClientChannel(Integer id) {
|
||||||
|
return ClientChannelMap.get(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ɾµôclientÁ¬½Ó
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
public void removeClientChannel(Channel ch) {
|
||||||
|
Iterator iter = ClientChannelMap.entrySet().iterator();
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
Entry entry = (Entry) iter.next();
|
||||||
|
if ((Channel) entry.getValue() == ch) {
|
||||||
|
ClientChannelMap.remove((Integer) entry.getKey());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ////////////////////////////////////////////////////////
|
||||||
|
public void run() {
|
||||||
|
long curTime = System.currentTimeMillis();
|
||||||
|
while (true) {
|
||||||
|
MessageEvent event = recvQueue.poll();
|
||||||
|
while (event != null) {
|
||||||
|
handle(event);
|
||||||
|
event = recvQueue.poll();
|
||||||
|
if (System.currentTimeMillis() - curTime > 20000) {
|
||||||
|
curTime = System.currentTimeMillis();
|
||||||
|
HandleBadLock(curTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep((long) 0.00001);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
public void HandleBadLock(long curTime) {
|
||||||
|
Iterator it = LockKeyMap.keySet().iterator();
|
||||||
|
while(it.hasNext()){
|
||||||
|
String key = (String) it.next();
|
||||||
|
LockKey value = LockKeyMap.get(key);
|
||||||
|
if (value.state == LockKey.waitLock) {
|
||||||
|
if (curTime - value.time > 20000) {
|
||||||
|
value.state = LockKey.badLock;
|
||||||
|
LockKeyMap.remove(key);
|
||||||
|
LockKeyMap.put(key, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getLockState(String key) {
|
||||||
|
LockKey lock = LockKeyMap.get(key);
|
||||||
|
if (lock == null) {
|
||||||
|
return LockKey.unLock;
|
||||||
|
}
|
||||||
|
return lock.state;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLockState(String key, Integer state) {
|
||||||
|
LockKey lock = LockKeyMap.get(key);
|
||||||
|
if (lock != null) {
|
||||||
|
lock.state = state;
|
||||||
|
LockKeyMap.put(key, lock);
|
||||||
|
} else {
|
||||||
|
System.out.println("set Lock state error");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean lockKey(String key, LockKey lock) {
|
||||||
|
LockKey lockKey = LockKeyMap.put(key, lock);
|
||||||
|
if (lockKey != null && (lockKey.state == 0 || lockKey.state == 1)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return lockKey == null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int desLockKeyCount(String key) {
|
||||||
|
LockKey lock = LockKeyMap.get(key);
|
||||||
|
if (lock != null) {
|
||||||
|
lock.ncount--;
|
||||||
|
LockKeyMap.put(key, lock);
|
||||||
|
return lock.ncount;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean removeLock(String key) {
|
||||||
|
return LockKeyMap.remove(key) != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void handle(MessageEvent e) {
|
||||||
|
NetMsg msg = (NetMsg) e.getMessage();
|
||||||
|
|
||||||
|
switch (msg.getMsgID()) {
|
||||||
|
case nm_connected: {
|
||||||
|
nm_Connected msgLite = msg.getMessageLite();
|
||||||
|
addClientChannel(msgLite.getNum(), e.getChannel());
|
||||||
|
|
||||||
|
nm_Connected_mem_back.Builder builder = nm_Connected_mem_back
|
||||||
|
.newBuilder();
|
||||||
|
builder.setNum(ClientMgr.getInstance().mClientNumber);
|
||||||
|
|
||||||
|
NetMsg send = NetMsg.newMessage();
|
||||||
|
send.setMessageLite(builder);
|
||||||
|
send.setMsgID(EMSGID.nm_connected_mem_back);
|
||||||
|
|
||||||
|
e.getChannel().write(send);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case nm_connected_mem_back: {
|
||||||
|
nm_Connected_mem_back msgLite = msg.getMessageLite();
|
||||||
|
addClientChannel(msgLite.getNum(), e.getChannel());
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case nr_connected_mem: {
|
||||||
|
webServeChannel = e.getChannel();
|
||||||
|
|
||||||
|
nr_Connected_mem_back.Builder builder = nr_Connected_mem_back
|
||||||
|
.newBuilder();
|
||||||
|
builder.setMemID(ClientMgr.getInstance().mClientNumber);
|
||||||
|
|
||||||
|
NetMsg send = NetMsg.newMessage();
|
||||||
|
send.setMessageLite(builder);
|
||||||
|
send.setMsgID(EMSGID.nr_connected_mem_back);
|
||||||
|
|
||||||
|
e.getChannel().write(send);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case nm_connected_web_back: {
|
||||||
|
webServeChannel = e.getChannel();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case nr_read: {
|
||||||
|
nr_Read msgLite = msg.getMessageLite();
|
||||||
|
|
||||||
|
Integer state = getLockState(msgLite.getKey());
|
||||||
|
if (state == LockKey.waitLock) {
|
||||||
|
nr_Read_res.Builder builder = nr_Read_res.newBuilder();
|
||||||
|
builder.setKey(msgLite.getKey());
|
||||||
|
builder.setValue("");
|
||||||
|
builder.setTime(msgLite.getTime());
|
||||||
|
|
||||||
|
NetMsg send = NetMsg.newMessage();
|
||||||
|
send.setMessageLite(builder);
|
||||||
|
send.setMsgID(EMSGID.nr_read_res);
|
||||||
|
|
||||||
|
webServeChannel.write(send);
|
||||||
|
return;
|
||||||
|
} else if (state == LockKey.unLock) {
|
||||||
|
String value = (String) client.get(msgLite.getKey());
|
||||||
|
if (value != null) {
|
||||||
|
nr_Read_res.Builder builder = nr_Read_res.newBuilder();
|
||||||
|
builder.setKey(msgLite.getKey());
|
||||||
|
builder.setValue(value);
|
||||||
|
builder.setTime(msgLite.getTime());
|
||||||
|
|
||||||
|
NetMsg send = NetMsg.newMessage();
|
||||||
|
send.setMessageLite(builder);
|
||||||
|
send.setMsgID(EMSGID.nr_read_res);
|
||||||
|
webServeChannel.write(send);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Can't get data in local Memcached server, try to ask for data from another cache node
|
||||||
|
nm_read.Builder builder = nm_read.newBuilder();
|
||||||
|
builder.setKey(msgLite.getKey());
|
||||||
|
builder.setTime(msgLite.getTime());
|
||||||
|
|
||||||
|
NetMsg send = NetMsg.newMessage();
|
||||||
|
send.setMessageLite(builder);
|
||||||
|
send.setMsgID(EMSGID.nm_read);
|
||||||
|
|
||||||
|
if (sendOtherCopyMsg(gethashMem(msgLite.getKey()), send) == false) {
|
||||||
|
nr_Read_res.Builder builder1 = nr_Read_res.newBuilder();
|
||||||
|
builder1.setKey(msgLite.getKey());
|
||||||
|
builder.setValue("");
|
||||||
|
builder1.setTime(msgLite.getTime());
|
||||||
|
|
||||||
|
NetMsg send1 = NetMsg.newMessage();
|
||||||
|
send1.setMessageLite(builder1);
|
||||||
|
send1.setMsgID(EMSGID.nr_read_res);
|
||||||
|
|
||||||
|
webServeChannel.write(send);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case nm_read: {
|
||||||
|
nm_read msgLite = msg.getMessageLite();
|
||||||
|
Integer state = getLockState(msgLite.getKey());
|
||||||
|
if (state == LockKey.unLock) {
|
||||||
|
String value = (String) client.get(msgLite.getKey());
|
||||||
|
if (value != null) {
|
||||||
|
nr_Read_res.Builder builder = nr_Read_res.newBuilder();
|
||||||
|
builder.setKey(msgLite.getKey());
|
||||||
|
builder.setTime(msgLite.getTime());
|
||||||
|
builder.setValue(value);
|
||||||
|
NetMsg send = NetMsg.newMessage();
|
||||||
|
send.setMessageLite(builder);
|
||||||
|
send.setMsgID(EMSGID.nr_read_res);
|
||||||
|
webServeChannel.write(send);
|
||||||
|
|
||||||
|
nm_read_recovery.Builder builder1 = nm_read_recovery.newBuilder();
|
||||||
|
builder1.setKey(msgLite.getKey());
|
||||||
|
builder1.setTime(msgLite.getTime());
|
||||||
|
builder1.setValue(value);
|
||||||
|
NetMsg send1 = NetMsg.newMessage();
|
||||||
|
send.setMessageLite(builder1);
|
||||||
|
send.setMsgID(EMSGID.nm_read_recovery);
|
||||||
|
e.getChannel().write(send1);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nr_Read_res.Builder builder = nr_Read_res.newBuilder();
|
||||||
|
builder.setKey(msgLite.getKey());
|
||||||
|
builder.setValue("");
|
||||||
|
builder.setTime(msgLite.getTime());
|
||||||
|
|
||||||
|
NetMsg send = NetMsg.newMessage();
|
||||||
|
send.setMessageLite(builder);
|
||||||
|
send.setMsgID(EMSGID.nr_read_res);
|
||||||
|
|
||||||
|
webServeChannel.write(send);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case nm_read_recovery: {
|
||||||
|
nm_read_recovery msgLite = msg.getMessageLite();
|
||||||
|
Integer state = getLockState(msgLite.getKey());
|
||||||
|
if(state == LockKey.waitLock) {
|
||||||
|
System.out.println("recovery fail because of waitlock.");
|
||||||
|
return;
|
||||||
|
}else if (state == LockKey.badLock) {
|
||||||
|
removeLock(msgLite.getKey());
|
||||||
|
}
|
||||||
|
boolean res = client.set(msgLite.getKey(), msgLite.getValue());
|
||||||
|
if (!res) {
|
||||||
|
setLockState(msgLite.getKey(), LockKey.badLock);
|
||||||
|
System.err.println("read recovery fail");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case nr_write: {
|
||||||
|
nr_write msgLite = msg.getMessageLite();
|
||||||
|
Integer state = getLockState(msgLite.getKey());
|
||||||
|
if (state == LockKey.waitLock) {
|
||||||
|
System.out.println("write conflict, please request again.");
|
||||||
|
nr_write_res.Builder builder2 = nr_write_res.newBuilder();
|
||||||
|
builder2.setKey(msgLite.getKey());
|
||||||
|
builder2.setValue("");
|
||||||
|
builder2.setTime(msgLite.getTime());
|
||||||
|
|
||||||
|
NetMsg send2 = NetMsg.newMessage();
|
||||||
|
send2.setMessageLite(builder2);
|
||||||
|
send2.setMsgID(EMSGID.nr_write_res);
|
||||||
|
webServeChannel.write(send2);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// else if (state == LockKey.badLock) {
|
||||||
|
// removeLock(msgLite.getKey());
|
||||||
|
// }
|
||||||
|
LockKey lockKey = new LockKey(
|
||||||
|
ClientMgr.getInstance().mClientNumber, ClientMgr.nCopyNode-1,
|
||||||
|
System.currentTimeMillis(), LockKey.waitLock);
|
||||||
|
if (lockKey(msgLite.getKey(), lockKey) == false) {
|
||||||
|
System.out.println("nr_write lock fail");
|
||||||
|
nr_write_res.Builder builder2 = nr_write_res.newBuilder();
|
||||||
|
builder2.setKey(msgLite.getKey());
|
||||||
|
builder2.setValue("");
|
||||||
|
builder2.setTime(msgLite.getTime());
|
||||||
|
|
||||||
|
NetMsg send2 = NetMsg.newMessage();
|
||||||
|
send2.setMessageLite(builder2);
|
||||||
|
send2.setMsgID(EMSGID.nr_write_res);
|
||||||
|
webServeChannel.write(send2);
|
||||||
|
System.out.println("write lock conflict, please request again.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
nm_write_1.Builder builder = nm_write_1.newBuilder();
|
||||||
|
builder.setKey(msgLite.getKey());
|
||||||
|
builder.setValue(msgLite.getValue());
|
||||||
|
builder.setMemID(ClientMgr.getInstance().mClientNumber);
|
||||||
|
builder.setTime(msgLite.getTime());
|
||||||
|
|
||||||
|
NetMsg send = NetMsg.newMessage();
|
||||||
|
send.setMessageLite(builder);
|
||||||
|
send.setMsgID(EMSGID.nm_write_1);
|
||||||
|
int count = sendOtherAllCopyMsg(gethashMem(msgLite.getKey()), send);
|
||||||
|
|
||||||
|
if (count != ClientMgr.nCopyNode-1) {
|
||||||
|
System.out.println("write failed in send invalidation msg to remote node.");
|
||||||
|
|
||||||
|
nr_write_res.Builder builder2 = nr_write_res.newBuilder();
|
||||||
|
builder2.setKey(msgLite.getKey());
|
||||||
|
builder2.setValue("");
|
||||||
|
builder2.setTime(msgLite.getTime());
|
||||||
|
|
||||||
|
NetMsg send2 = NetMsg.newMessage();
|
||||||
|
send2.setMessageLite(builder2);
|
||||||
|
send2.setMsgID(EMSGID.nr_write_res);
|
||||||
|
webServeChannel.write(send2);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case nm_write_1: {
|
||||||
|
nm_write_1 msgLite = msg.getMessageLite();
|
||||||
|
|
||||||
|
Integer state = getLockState(msgLite.getKey());
|
||||||
|
if (state == LockKey.waitLock) {
|
||||||
|
removeLock(msgLite.getKey());
|
||||||
|
}
|
||||||
|
else if (state == LockKey.badLock) {
|
||||||
|
removeLock(msgLite.getKey());
|
||||||
|
}
|
||||||
|
|
||||||
|
// System.out.println("ready to write key: "+msgLite.getKey());
|
||||||
|
|
||||||
|
LockKey lockKey = new LockKey(
|
||||||
|
ClientMgr.getInstance().mClientNumber, 0,
|
||||||
|
System.currentTimeMillis(), LockKey.waitLock);
|
||||||
|
if (lockKey(msgLite.getKey(), lockKey) == false) {
|
||||||
|
System.out.println("nm_write_1 Lock fail");
|
||||||
|
}
|
||||||
|
|
||||||
|
nm_write_1_res.Builder builder = nm_write_1_res.newBuilder();
|
||||||
|
builder.setKey(msgLite.getKey());
|
||||||
|
builder.setValue(msgLite.getValue());
|
||||||
|
builder.setTime(msgLite.getTime());
|
||||||
|
builder.setMemID(msgLite.getMemID());
|
||||||
|
|
||||||
|
NetMsg send = NetMsg.newMessage();
|
||||||
|
send.setMessageLite(builder);
|
||||||
|
send.setMsgID(EMSGID.nm_write_1_res);
|
||||||
|
getClientChannel(msgLite.getMemID()).write(send);
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case nm_write_1_res: {
|
||||||
|
nm_write_1_res msgLite = msg.getMessageLite();
|
||||||
|
|
||||||
|
if (desLockKeyCount(msgLite.getKey()) == ClientMgr.protocol) {
|
||||||
|
boolean res = client.set(msgLite.getKey(),
|
||||||
|
msgLite.getValue());
|
||||||
|
if (res) {
|
||||||
|
removeLock(msgLite.getKey());
|
||||||
|
// System.out.println("key:"+msgLite.getKey()+", value:"+msgLite.getValue());
|
||||||
|
nr_write_res.Builder builder2 = nr_write_res.newBuilder();
|
||||||
|
builder2.setKey(msgLite.getKey());
|
||||||
|
builder2.setValue(msgLite.getValue());
|
||||||
|
builder2.setTime(msgLite.getTime());
|
||||||
|
NetMsg send2 = NetMsg.newMessage();
|
||||||
|
send2.setMessageLite(builder2);
|
||||||
|
send2.setMsgID(EMSGID.nr_write_res);
|
||||||
|
webServeChannel.write(send2);
|
||||||
|
|
||||||
|
nm_write_2.Builder builder = nm_write_2.newBuilder();
|
||||||
|
builder.setKey(msgLite.getKey());
|
||||||
|
builder.setValue(msgLite.getValue());
|
||||||
|
builder.setMemID(msgLite.getMemID());
|
||||||
|
builder.setTime(msgLite.getTime());
|
||||||
|
NetMsg send = NetMsg.newMessage();
|
||||||
|
send.setMessageLite(builder);
|
||||||
|
send.setMsgID(EMSGID.nm_write_2);
|
||||||
|
sendOtherAllCopyMsg(gethashMem(msgLite.getKey()),send);
|
||||||
|
} else {
|
||||||
|
setLockState(msgLite.getKey(), LockKey.badLock);
|
||||||
|
System.err.println("write to memcached server error");
|
||||||
|
nr_write_res.Builder builder2 = nr_write_res.newBuilder();
|
||||||
|
builder2.setKey(msgLite.getKey());
|
||||||
|
builder2.setValue("");
|
||||||
|
builder2.setTime(msgLite.getTime());
|
||||||
|
|
||||||
|
NetMsg send2 = NetMsg.newMessage();
|
||||||
|
send2.setMessageLite(builder2);
|
||||||
|
send2.setMsgID(EMSGID.nr_write_res);
|
||||||
|
webServeChannel.write(send2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case nm_write_2: {
|
||||||
|
nm_write_2 msgLite = msg.getMessageLite();
|
||||||
|
|
||||||
|
boolean res = client.set(msgLite.getKey(), msgLite.getValue());
|
||||||
|
if (res) {
|
||||||
|
removeLock(msgLite.getKey());
|
||||||
|
// System.out.println("key:"+msgLite.getKey()+", value:"+msgLite.getValue());
|
||||||
|
} else {
|
||||||
|
setLockState(msgLite.getKey(), LockKey.badLock);
|
||||||
|
System.err.println("write in write_2 fail");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
System.err.println(msg.getMsgID().toString());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addSession(MessageEvent e) {
|
||||||
|
recvQueue.offer(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int gethashMem(String key) {
|
||||||
|
return Math.abs(key.hashCode() % ClientMgr.getInstance().getSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean sendOtherCopyMsg(Integer hash, NetMsg msg) {
|
||||||
|
for (int i = 0; i < ClientMgr.nCopyNode; i++) {
|
||||||
|
Integer index = (hash + i + ClientMgr.getInstance().getSize())
|
||||||
|
% ClientMgr.getInstance().getSize();
|
||||||
|
if (index == ClientMgr.getInstance().mClientNumber)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
Channel eChannel = getClientChannel(index);
|
||||||
|
if (eChannel != null) {
|
||||||
|
eChannel.write(msg);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int sendOtherAllCopyMsg(int hash, NetMsg msg) {
|
||||||
|
int count = 0;
|
||||||
|
for (int i = 0; i < ClientMgr.nCopyNode; i++) {
|
||||||
|
int index = (hash + i + ClientMgr.getInstance().getSize())
|
||||||
|
% ClientMgr.getInstance().getSize();
|
||||||
|
|
||||||
|
if (index == ClientMgr.getInstance().mClientNumber)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
Channel eChannel = getClientChannel(index);
|
||||||
|
if (eChannel != null) {
|
||||||
|
eChannel.write(msg);
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,120 @@
|
||||||
|
package test;
|
||||||
|
|
||||||
|
import java.util.Hashtable;
|
||||||
|
|
||||||
|
import memcached.MemcachedClient;
|
||||||
|
|
||||||
|
import org.apache.log4j.PropertyConfigurator;
|
||||||
|
/**
|
||||||
|
* Test the communication between R-Memcached and Memcached server in one cache node.
|
||||||
|
* @author Yanran Lu
|
||||||
|
*/
|
||||||
|
public class MainTest {
|
||||||
|
// store results from threads
|
||||||
|
private static Hashtable<Integer, StringBuilder> threadInfo = new Hashtable<Integer, StringBuilder>();
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
PropertyConfigurator.configure(MainTest.class.getClass()
|
||||||
|
.getResource("/").getPath()
|
||||||
|
+ "log4j.properties");
|
||||||
|
// String[] serverlist = { "127.0.0.1:20000", "127.0.0.1:20001" ,
|
||||||
|
// "127.0.0.1:20002" };
|
||||||
|
|
||||||
|
int threads = Integer.parseInt(args[0]); // the number of threads
|
||||||
|
int runs = Integer.parseInt(args[1]); // how many request send out by a thread
|
||||||
|
int Nums = Integer.parseInt(args[2]); // the total size of data
|
||||||
|
int size = Integer.parseInt(args[3]); // the size of a data
|
||||||
|
|
||||||
|
// get object to store
|
||||||
|
byte[] obj = new byte[size];
|
||||||
|
for (int i = 0; i < size; i++) {
|
||||||
|
obj[i] = '1';
|
||||||
|
}
|
||||||
|
String value = new String(obj);
|
||||||
|
|
||||||
|
String[] keys = new String[size];
|
||||||
|
for (int i = 0; i < Nums; i++) {
|
||||||
|
keys[i] = "key" + i;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < threads; i++) {
|
||||||
|
bench b = new bench(runs, Nums, i, value, keys);
|
||||||
|
b.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
int i = 0;
|
||||||
|
while (i < threads) {
|
||||||
|
if (threadInfo.containsKey(new Integer(i))) {
|
||||||
|
System.out.println(threadInfo.get(new Integer(i)));
|
||||||
|
i++;
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test code per thread.
|
||||||
|
*/
|
||||||
|
private static class bench extends Thread {
|
||||||
|
private int runs;
|
||||||
|
private int threadNum;
|
||||||
|
private String object;
|
||||||
|
private String[] keys;
|
||||||
|
private int size;
|
||||||
|
private int nums;
|
||||||
|
|
||||||
|
public bench(int runs, int nums, int threadNum, String object,
|
||||||
|
String[] keys) {
|
||||||
|
this.runs = runs;
|
||||||
|
this.threadNum = threadNum;
|
||||||
|
this.object = object;
|
||||||
|
this.keys = keys;
|
||||||
|
this.size = object.length();
|
||||||
|
this.nums = nums;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void run() {
|
||||||
|
StringBuilder result = new StringBuilder();
|
||||||
|
|
||||||
|
// get client instance
|
||||||
|
MemcachedClient mc = new MemcachedClient("127.0.0.1:20000");
|
||||||
|
mc.setCompressEnable(false);
|
||||||
|
mc.setCompressThreshold(0);
|
||||||
|
|
||||||
|
try {
|
||||||
|
Thread.sleep(0);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
// time deletes
|
||||||
|
long start = System.nanoTime();
|
||||||
|
randReadWrite(mc, 0.8);
|
||||||
|
long elapse = (System.nanoTime() - start) / 1000000;
|
||||||
|
float avg = (float) elapse / runs;
|
||||||
|
result.append("\nthread " + threadNum + ": runs: " + runs
|
||||||
|
+ " read or write of obj " + (size / 1024)
|
||||||
|
+ "KB -- avg time per req " + avg + " ms (total: " + elapse
|
||||||
|
+ " ms)");
|
||||||
|
|
||||||
|
threadInfo.put(new Integer(threadNum), result);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void randReadWrite(MemcachedClient mc, double scale) {
|
||||||
|
for (int i = 0; i < runs; i++) {
|
||||||
|
if (Math.random() < scale) {
|
||||||
|
mc.get(keys[i % nums]);
|
||||||
|
} else {
|
||||||
|
mc.set(keys[i % nums], object);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue