Merge branch 'Memcached_Client'

This commit is contained in:
lyr90329 2014-08-13 17:57:36 +08:00
commit 09199591ca
7 changed files with 321 additions and 367 deletions

View File

@ -16,5 +16,8 @@ public enum EMSGID
nm_read, nm_read,
nm_write_1, nm_write_1,
nm_write_1_res, nm_write_1_res,
nm_write_2 nm_write_2,
nr_stats,
nr_stats_res
} }

View File

@ -4,6 +4,7 @@ import java.lang.reflect.InvocationTargetException;
import messageBody.memcachedmsg.nm_Connected; import messageBody.memcachedmsg.nm_Connected;
import messageBody.requestMsg.nr_Connected_mem_back; import messageBody.requestMsg.nr_Connected_mem_back;
import messageBody.requestMsg.nr_Read_res; import messageBody.requestMsg.nr_Read_res;
import messageBody.requestMsg.nr_Stats_res;
import messageBody.requestMsg.nr_write_res; import messageBody.requestMsg.nr_write_res;
import com.google.protobuf.GeneratedMessage; import com.google.protobuf.GeneratedMessage;
@ -16,6 +17,7 @@ public class RegisterHandler
initHandler(EMSGID.nm_connected.ordinal(), nm_Connected.class); initHandler(EMSGID.nm_connected.ordinal(), nm_Connected.class);
initHandler(EMSGID.nr_connected_mem_back.ordinal(), nr_Connected_mem_back.class); initHandler(EMSGID.nr_connected_mem_back.ordinal(), nr_Connected_mem_back.class);
initHandler(EMSGID.nr_read_res.ordinal(), nr_Read_res.class); initHandler(EMSGID.nr_read_res.ordinal(), nr_Read_res.class);
initHandler(EMSGID.nr_stats_res.ordinal(), nr_Stats_res.class);
initHandler(EMSGID.nr_write_res.ordinal(), nr_write_res.class); initHandler(EMSGID.nr_write_res.ordinal(), nr_write_res.class);
} }

View File

@ -1,35 +1,33 @@
package server; package com.myself.server;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder; import org.jboss.netty.handler.codec.frame.FrameDecoder;
import server.NetMsg;
import common.EMSGID; import common.EMSGID;
public class MDecoder extends FrameDecoder public class MDecoder extends FrameDecoder
{ {
@Override @Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
if (buffer.readableBytes() < 4) if (buffer.readableBytes() < 8) {
{ return null;// (1)
return null;//(1) }
} int dataLength = buffer.getInt(buffer.readerIndex());
int dataLength = buffer.getInt(buffer.readerIndex()); if (buffer.readableBytes() < dataLength + 4) {
if (buffer.readableBytes() < dataLength + 4) return null;// (2)
{ }
return null;//(2)
} buffer.skipBytes(4);// (3)
int id = buffer.readInt();
buffer.skipBytes(4);//(3) int nodeRoute = buffer.readInt();
int id = buffer.readInt(); byte[] decoded = new byte[dataLength - 8];
byte[] decoded = new byte[dataLength-4];
buffer.readBytes(decoded);
buffer.readBytes(decoded); NetMsg msg = new NetMsg(decoded, id);// (4)
NetMsg msg = new NetMsg(decoded, id);//(4) msg.setMsgID(EMSGID.values()[id]);
msg.setMsgID(EMSGID.values()[id]); msg.setNodeRoute(nodeRoute);
return msg; return msg;
} }
} }

View File

@ -1,4 +1,4 @@
package server; package com.myself.server;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.buffer.ChannelBuffers;
@ -6,23 +6,22 @@ import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
import server.NetMsg;
public class MEncoder extends OneToOneEncoder public class MEncoder extends OneToOneEncoder
{ {
@Override @Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception
{ {
if (!(msg instanceof NetMsg)) if (!(msg instanceof NetMsg))
{ {
return msg;//(1) return msg;//(1)
} }
NetMsg res = (NetMsg)msg; NetMsg res = (NetMsg)msg;
byte[] data = res.getBytes(); byte[] data = res.getBytes();
int dataLength = data.length+4; int dataLength = data.length+8;
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();//(2) ChannelBuffer buf = ChannelBuffers.dynamicBuffer();//(2)
buf.writeInt(dataLength); buf.writeInt(dataLength);
buf.writeInt(res.msgID.ordinal()); buf.writeInt(res.msgID.ordinal());
buf.writeInt(res.getNodeRoute());
buf.writeBytes(data); buf.writeBytes(data);
return buf;//(3) return buf;//(3)
} }

View File

@ -1,7 +1,6 @@
package server; package com.myself.server;
import com.google.protobuf.GeneratedMessage; import com.google.protobuf.GeneratedMessage;
import com.google.protobuf.MessageLite; import com.google.protobuf.MessageLite;
import server.NetMsg;
import common.EMSGID; import common.EMSGID;
import common.MessageManager; import common.MessageManager;
@ -10,11 +9,14 @@ public class NetMsg
{ {
EMSGID msgID; EMSGID msgID;
MessageLite messageLite; MessageLite messageLite;
int nodeRoute;
private NetMsg(){}; private NetMsg(){};
public static NetMsg newMessage() public static NetMsg newMessage()
{ {
return new NetMsg(); NetMsg msg = new NetMsg();
msg.setNodeRoute(0);
return msg;
} }
NetMsg(byte[] decoded, int id) throws Exception NetMsg(byte[] decoded, int id) throws Exception
@ -49,5 +51,10 @@ public class NetMsg
{ {
this.messageLite = builder.build(); this.messageLite = builder.build();
} }
public int getNodeRoute() {
return nodeRoute;
}
public void setNodeRoute(int nodeRoute) {
this.nodeRoute = nodeRoute;
}
} }

View File

@ -1,10 +1,8 @@
package server; package com.myself.server;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Properties;
import java.util.Random; import java.util.Random;
import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilder;
@ -12,51 +10,44 @@ import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException; import javax.xml.parsers.ParserConfigurationException;
import org.apache.log4j.PropertyConfigurator; import org.apache.log4j.PropertyConfigurator;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.w3c.dom.Document; import org.w3c.dom.Document;
import org.w3c.dom.Element; import org.w3c.dom.Element;
import org.w3c.dom.Node; import org.w3c.dom.Node;
import org.w3c.dom.NodeList; import org.w3c.dom.NodeList;
import org.xml.sax.SAXException; import org.xml.sax.SAXException;
import r_memcached.MemcachedMgr; import com.myself.memcached.MemcachedMgr;
import server.ClientConfig;
import server.Server;
import common.RegisterHandler; import common.RegisterHandler;
public class WebServerMain
{ public class WebServerMain {
static HashMap<Integer, ClientConfig> m_mapMemcachedClient; static HashMap<Integer, ClientConfig> m_mapMemcachedClient;
public static boolean initConfig() static int threadCount = 0;
{
public static boolean initConfig() {
m_mapMemcachedClient = new HashMap<Integer, ClientConfig>(); m_mapMemcachedClient = new HashMap<Integer, ClientConfig>();
File f = new File(System.getProperty("user.dir")); File f = new File(System.getProperty("user.dir"));
String path = f.getPath() + File.separator + "bin" + File.separator; String path = f.getPath() + File.separator + "bin" + File.separator;
readClientsXML(path+"client.xml"); readClientsXML(path + "client.xml");
try {
Properties properties = new Properties();
properties.load(new FileInputStream(path+"config.properties"));
MemcachedMgr.nCopyNode = Integer.parseInt(properties.getProperty("replicasNum"));
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return true; return true;
} }
private static class bench extends Thread private static class bench extends Thread {
{
private int runs; private int runs;
@SuppressWarnings("unused")
private int threadNum; private int threadNum;
private String object; private String object;
private String[] keys; private String[] keys;
@SuppressWarnings("unused")
private int size; private int size;
private int nums; private int nums;
private double rate; private double rate;
public bench(int runs,int nums, int threadNum, String object, String[] keys, double rate) public bench(int runs, int nums, int threadNum, String object,
{ String[] keys, double rate) {
this.runs = runs; this.runs = runs;
this.threadNum = threadNum; this.threadNum = threadNum;
this.object = object; this.object = object;
@ -66,13 +57,10 @@ public class WebServerMain
this.rate = rate; this.rate = rate;
} }
public void run() public void run() {
{ try {
try
{
Thread.sleep(10); Thread.sleep(10);
} catch (InterruptedException e) } catch (InterruptedException e) {
{
// TODO Auto-generated catch block // TODO Auto-generated catch block
e.printStackTrace(); e.printStackTrace();
} }
@ -81,28 +69,22 @@ public class WebServerMain
time = System.nanoTime(); time = System.nanoTime();
randReadWrite(rate); randReadWrite(rate);
time = System.nanoTime() - time; time = System.nanoTime() - time;
System.out.println(time/1000000000.0f); System.err.println("cost: " + time / 1000000000.0f);
WebServerMain.threadCount --;
} }
public void randReadWrite(double scale) public void randReadWrite(double scale) {
{
Random randNum = new Random(); Random randNum = new Random();
for (int i = 0; i < runs; i++) for (int i = 0; i < runs; i++) {
{ if (Math.random() < scale) {
if (Math.random()<scale)
{
webSession.getInstance().get(keys[randNum.nextInt(nums)]); webSession.getInstance().get(keys[randNum.nextInt(nums)]);
} } else {
else
{
webSession.getInstance().set(keys[randNum.nextInt(nums)], object); webSession.getInstance().set(keys[randNum.nextInt(nums)], object);
} }
try try {
{
Thread.sleep((long) 0.00001); Thread.sleep((long) 0.00001);
} catch (InterruptedException e) } catch (InterruptedException e) {
{
// TODO Auto-generated catch block // TODO Auto-generated catch block
e.printStackTrace(); e.printStackTrace();
} }
@ -110,107 +92,138 @@ public class WebServerMain
} }
} }
public static void main(String[] args) public static void main(String[] args) {
{ for (int i =0; i< 10; i++){
PropertyConfigurator.configure(System.getProperty("user.dir")+"/bin/log4j.properties");//加载.properties文件 WebServerMain.run(args);
}
}
public static void run(String[] args) {
PropertyConfigurator.configure(System.getProperty("user.dir")
+ "/bin/log4j.properties");// 加载.properties文件
initConfig(); initConfig();
RegisterHandler.initHandler(); RegisterHandler.initHandler();
webSession.getInstance().start(); webSession.getInstance().start();
// client¹ÜÀí // client¹ÜÀí
MemcachedMgr clientMgr = MemcachedMgr.getInstance(); MemcachedMgr clientMgr = MemcachedMgr.getInstance();
clientMgr.init(m_mapMemcachedClient); clientMgr.init(m_mapMemcachedClient);
Server requestServer = Server.getInstance(); Server requestServer = Server.getInstance();
requestServer.init(8888); requestServer.init(8888);
int threads = Integer.parseInt(args[0]);//线程数 int threads = Integer.parseInt(args[0]);// 线程数
int runs = Integer.parseInt(args[1]); //执行次数 WebServerMain.threadCount = threads;
int Nums = Integer.parseInt(args[2]); // key数目 int runs = Integer.parseInt(args[1]); // 执行次数
int size = Integer.parseInt(args[3]); // value大小 int Nums = Integer.parseInt(args[2]); // key数目
double rate = Double.parseDouble(args[4]); //读写比例 int size = Integer.parseInt(args[3]); // value大小
double rate = Double.parseDouble(args[4]); // 读写比例
// get object to store // get object to store
byte[] obj = new byte[size]; byte[] obj = new byte[size];
for (int i = 0; i < size; i++) for (int i = 0; i < size; i++) {
{
obj[i] = '1'; obj[i] = '1';
} }
String value = new String(obj); String value = new String(obj);
String[] keys = new String[Nums]; String[] keys = new String[Nums];
for (int i = 0; i < Nums; i++) for (int i = 0; i < Nums; i++) {
{
keys[i] = "" + i; keys[i] = "" + i;
} }
for (int i = 0; i < threads; i++) for (int i = 0; i < threads; i++) {
{
bench b = new bench(runs, Nums, i, value, keys, rate); bench b = new bench(runs, Nums, i, value, keys, rate);
b.start(); b.start();
} }
while (WebServerMain.threadCount != 0){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
System.out.println(webSession.results.get(0));
} catch (JSONException e1) {
e1.printStackTrace();
}
webSession.getInstance().stats();
while (webSession.stats.size() != 4 || webSession.results.length() < 3000){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(webSession.results.length());
// try {
// JSONObject jStats = new JSONObject(webSession.stats.get(2));
// System.out.println(jStats);
// System.out.println(jStats.get("cmd_get"));
// System.out.println(jStats.get("cmd_set"));
// } catch (JSONException e) {
// e.printStackTrace();
// }
requestServer.stop();
//webSession.session = null;
webSession.results = new JSONArray();
} }
// ÁÈ¡memcached clientÅäÖÃ // ÁÈ¡memcached clientÅäÖÃ
public static boolean readClientsXML(String str) public static boolean readClientsXML(String str) {
{ DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilderFactory factory=DocumentBuilderFactory.newInstance(); try {
try { factory.setIgnoringElementContentWhitespace(true);
factory.setIgnoringElementContentWhitespace(true);
DocumentBuilder db = factory.newDocumentBuilder();
DocumentBuilder db=factory.newDocumentBuilder(); Document xmldoc = db.parse(new File(str));
Document xmldoc=db.parse(new File(str)); Element elmtInfo = xmldoc.getDocumentElement();
Element elmtInfo = xmldoc.getDocumentElement(); NodeList nodes = elmtInfo.getChildNodes();
NodeList nodes = elmtInfo.getChildNodes(); for (int i = 0; i < nodes.getLength(); i++) {
for (int i = 0; i < nodes.getLength(); i++) Node result = nodes.item(i);
{ if (result.getNodeType() == Node.ELEMENT_NODE
Node result = nodes.item(i); && result.getNodeName().equals("client")) {
if (result.getNodeType() == Node.ELEMENT_NODE && result.getNodeName().equals("client")) NodeList ns = result.getChildNodes();
{ ClientConfig localClient = new ClientConfig();
NodeList ns = result.getChildNodes(); int m = 0;
ClientConfig localClient = new ClientConfig(); for (int j = 0; j < ns.getLength(); j++) {
int m=0; Node record = ns.item(j);
for (int j = 0; j < ns.getLength(); j++) if (record.getNodeType() == Node.ELEMENT_NODE) {
{ if (record.getNodeName().equals("id")) {
Node record = ns.item(j); m++;
if (record.getNodeType() == Node.ELEMENT_NODE) localClient.id = Integer.decode(record
{ .getTextContent());
if (record.getNodeName().equals("id")) } else if (record.getNodeName().equals("host")) {
{ m++;
m++; localClient.host = record.getTextContent();
localClient.id = Integer.decode(record.getTextContent()); } else if (record.getNodeName().equals(
} "client_port")) {
else if (record.getNodeName().equals("host")) m++;
{ localClient.client_port = Integer.decode(record
m++; .getTextContent());
localClient.host = record.getTextContent(); } else if (record.getNodeName().equals("memcached")) {
}
else if (record.getNodeName().equals("client_port"))
{
m++;
localClient.client_port = Integer.decode(record.getTextContent());
}
else if (record.getNodeName().equals("memcached"))
{
m++; m++;
localClient.memcached = record.getTextContent(); localClient.memcached = record.getTextContent();
} }
} }
} }
if(m==4) if (m == 4) {
{ m_mapMemcachedClient.put(localClient.id, localClient);
m_mapMemcachedClient.put(localClient.id, localClient); }
} }
} }
} } catch (ParserConfigurationException e) {
} catch (ParserConfigurationException e) { e.printStackTrace();
e.printStackTrace(); } catch (SAXException e) {
} catch (SAXException e) { e.printStackTrace();
e.printStackTrace(); } catch (IOException e) {
} catch (IOException e) { e.printStackTrace();
e.printStackTrace(); }
} return true;
return true; }
}
} }

View File

@ -1,5 +1,6 @@
package server; package com.myself.server;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -11,303 +12,235 @@ import messageBody.memcachedmsg.nm_Connected_web_back;
import messageBody.requestMsg.nr_Connected_mem_back; import messageBody.requestMsg.nr_Connected_mem_back;
import messageBody.requestMsg.nr_Read; import messageBody.requestMsg.nr_Read;
import messageBody.requestMsg.nr_Read_res; import messageBody.requestMsg.nr_Read_res;
import messageBody.requestMsg.nr_Stats;
import messageBody.requestMsg.nr_Stats_res;
import messageBody.requestMsg.nr_write; import messageBody.requestMsg.nr_write;
import messageBody.requestMsg.nr_write_res; import messageBody.requestMsg.nr_write_res;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.log4j.Priority;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.util.internal.ConcurrentHashMap; import org.jboss.netty.util.internal.ConcurrentHashMap;
import org.json.JSONArray;
import database.DBMessage; import com.myself.memcached.MemcachedMgr;
import r_memcached.MemcachedMgr;
import server.NetMsg;
import common.EMSGID; import common.EMSGID;
public class webSession implements Runnable public class webSession implements Runnable {
{
ConcurrentLinkedQueue<MessageEvent> recvQueue = new ConcurrentLinkedQueue<MessageEvent>(); ConcurrentLinkedQueue<MessageEvent> recvQueue = new ConcurrentLinkedQueue<MessageEvent>();
Map<Integer, Channel> MemcachedChannelMap = new ConcurrentHashMap<Integer, Channel>(); Map<Integer, Channel> MemcachedChannelMap = new ConcurrentHashMap<Integer, Channel>();
Map<Integer, Channel> RequestChannelMap = new ConcurrentHashMap<Integer, Channel>(); Map<Integer, Channel> RequestChannelMap = new ConcurrentHashMap<Integer, Channel>();
static webSession session = null; public static webSession session = null;
public Channel curChannel; public Channel curChannel;
public long totalTime = 0; public long totalTime = 0;
public long ticks =0; public long ticks = 0;
//public static Logger log = LoggerUtil.getInstance(); public static JSONArray results = new JSONArray();
public static Map<Integer, String> stats = new HashMap<>();
// public static Logger log = LoggerUtil.getInstance();
public static Logger log = Logger.getLogger(webSession.class.getName()); public static Logger log = Logger.getLogger(webSession.class.getName());
public static webSession getInstance() public static webSession getInstance() {
{ if (session == null) {
if (session == null)
{
session = new webSession(); session = new webSession();
} }
return session; return session;
} }
public void start() public void start() {
{ // DBSession.getInstance().start(); Êý¾Ý¿âÁ¬½Ó
//DBSession.getInstance().start(); 数据库连接
new Thread(session).start(); new Thread(session).start();
System.out.println("session start"); System.out.println("session start");
} }
// Ìí¼ÓrequestÁ¬½Ó // Ìí¼ÓrequestÁ¬½Ó
public void addRequestChannel(Channel ch) public void addRequestChannel(Channel ch) {
{
RequestChannelMap.put(ch.getId(), ch); RequestChannelMap.put(ch.getId(), ch);
} }
// ɾ³ýrequestÁ¬½Ó // ɾ³ýrequestÁ¬½Ó
public void removeRequestChannel(Channel ch) public void removeRequestChannel(Channel ch) {
{ RequestChannelMap.remove(ch.getId());
RequestChannelMap.remove(ch.getId());
} }
// »ñµÃrequestÁ¬½Ó // »ñµÃrequestÁ¬½Ó
public Channel getRequestChannel(Integer id) public Channel getRequestChannel(Integer id) {
{
return RequestChannelMap.get(id); return RequestChannelMap.get(id);
} }
// Ôö¼ÓclientÁ¬½Ó // Ôö¼ÓclientÁ¬½Ó
public void addClientChannel(Integer num,Channel ch) public void addClientChannel(Integer num, Channel ch) {
{
MemcachedChannelMap.put(num, ch); MemcachedChannelMap.put(num, ch);
} }
public Channel getClientChannel(Integer id)
{ public Channel getClientChannel(Integer id) {
return MemcachedChannelMap.get(id); return MemcachedChannelMap.get(id);
} }
// ɾµôclientÁ¬½Ó // ɾµôclientÁ¬½Ó
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
public void removeClientChannel(Channel ch) public void removeClientChannel(Channel ch) {
{
Iterator iter = MemcachedChannelMap.entrySet().iterator(); Iterator iter = MemcachedChannelMap.entrySet().iterator();
while (iter.hasNext()) while (iter.hasNext()) {
{
Entry entry = (Entry) iter.next(); Entry entry = (Entry) iter.next();
if ((Channel)entry.getValue() == ch) if ((Channel) entry.getValue() == ch) {
{ MemcachedChannelMap.remove((Integer) entry.getKey());
MemcachedChannelMap.remove((Integer)entry.getKey());
break; break;
} }
} }
} }
//////////////////////////////////////////////////////////
public void run() // ////////////////////////////////////////////////////////
{ public void run() {
while(true) while (true) {
{
MessageEvent event = recvQueue.poll(); MessageEvent event = recvQueue.poll();
while(event != null) while (event != null) {
{
handle(event); handle(event);
event = recvQueue.poll(); event = recvQueue.poll();
} }
try try {
{
Thread.sleep(0); Thread.sleep(0);
} catch (InterruptedException e) } catch (InterruptedException e) {
{
// TODO Auto-generated catch block // TODO Auto-generated catch block
e.printStackTrace(); e.printStackTrace();
} }
} }
} }
public int gethashMem(String key) {
public int gethashMem(String key) return Math.abs(key.hashCode() % MemcachedMgr.getInstance().getSize());
{
return Math.abs(key.hashCode()%MemcachedMgr.getInstance().getSize());
} }
public void handle(MessageEvent e) public void handle(MessageEvent e) {
{ NetMsg msg = (NetMsg) e.getMessage();
NetMsg msg = (NetMsg)e.getMessage(); switch (msg.getMsgID()) {
switch (msg.getMsgID()) case nm_connected: {
{
case nm_connected:
{
nm_Connected msgBody = msg.getMessageLite(); nm_Connected msgBody = msg.getMessageLite();
addClientChannel(msgBody.getNum(), e.getChannel()); addClientChannel(msgBody.getNum(), e.getChannel());
nm_Connected_web_back.Builder builder = nm_Connected_web_back.newBuilder(); nm_Connected_web_back.Builder builder = nm_Connected_web_back.newBuilder();
NetMsg send = NetMsg.newMessage(); NetMsg send = NetMsg.newMessage();
send.setMessageLite(builder); send.setMessageLite(builder);
send.setMsgID(EMSGID.nm_connected_web_back); send.setMsgID(EMSGID.nm_connected_web_back);
sendMsg(e.getChannel(), send); sendMsg(e.getChannel(), send);
} }
break; break;
case nr_connected_mem_back: case nr_connected_mem_back: {
{ nr_Connected_mem_back msgLite = msg.getMessageLite();
nr_Connected_mem_back msgLite= msg.getMessageLite(); addClientChannel(msgLite.getMemID(), e.getChannel());
addClientChannel(msgLite.getMemID(), e.getChannel());
} }
break; break;
case nr_read_res: case nr_stats_res: {
{ nr_Stats_res msgBody = msg.getMessageLite();
stats.put(msg.getNodeRoute(), msgBody.getValue());
//System.out.println("stats:\n" + msgBody.getValue());
}
break;
case nr_read_res: {
nr_Read_res msgBody = msg.getMessageLite(); nr_Read_res msgBody = msg.getMessageLite();
// System.out.println("key:"+msgBody.getKey()+" value:"+msgBody.getValue()); Map<String, String> readResult = new HashMap<String, String>();
readResult.put("type", "GET");
// System.out.println(String.valueOf((System.nanoTime()-msgBody.getTime())/1000000.0)); readResult.put("node", ""+msg.getNodeRoute());
// log.log(Priority.INFO, String.valueOf((System.nanoTime()-msgBody.getTime())/1000000.0)); readResult.put("key", msgBody.getKey());
// log.log(Level.INFO, String.valueOf((System.nanoTime()-msgBody.getTime())/1000000.0)); readResult.put("value", msgBody.getValue());
// System.err.println((System.nanoTime()-msgBody.getTime())/1000000.0); results.put(readResult);
// if (ticks==0) //System.out.println("key:"+msgBody.getKey()+" value:"+msgBody.getValue());
// {
// totalTime = System.currentTimeMillis();
// }
// totalTime += System.nanoTime()-msgBody.getTime();
// ticks++;
// if (ticks == 1000)
// {
// System.out.println(totalTime/1000000000.0f);
//
// totalTime = 0;
// ticks = 0;
// }
// if (msgBody.getValue().isEmpty()) //读数据库
// {
// DBMessage dbMsg=new DBMessage();
// dbMsg.mode = DBMessage.mode_query;
// dbMsg.key = msgBody.getKey();
//
// DBSession.getInstance().addDBMessage(dbMsg);
// }
// else
// {
// nc_ReadRes.Builder builder = nc_ReadRes.newBuilder();
// builder.setKey(msgBody.getKey());
// builder.setValue(msgBody.getValue());
//
// NetMsg send = NetMsg.newMessage();
// send.setMessageLite(builder);
// send.setMsgID(EMSGID.nc_read_res);
//
// //sendMsg(getRequestChannel(msgBody.getClientid()), send);
// }
} }
break; break;
case nr_write_res: case nr_write_res: {
{
nr_write_res msgBody = msg.getMessageLite(); nr_write_res msgBody = msg.getMessageLite();
// System.out.println("key:"+msgBody.getKey()+" value:"+msgBody.getValue()); Map<String, String> readResult = new HashMap<String, String>();
readResult.put("type", "SET");
//DBMessage dbMsg = new DBMessage(); //异步写数据库 readResult.put("node", ""+msg.getNodeRoute());
//dbMsg.mode = DBMessage.mode_set; readResult.put("key", msgBody.getKey());
//dbMsg.key = msgBody.getKey(); readResult.put("value", msgBody.getValue());
//dbMsg.value = msgBody.getValue(); results.put(readResult);
//DBSession.getInstance().addDBMessage(dbMsg); //System.out.println("key:"+msgBody.getKey()+" value:"+msgBody.getValue());
// log.log(Level.INFO, String.valueOf((System.nanoTime()-msgBody.getTime())/1000000.0));
//System.err.println();
// System.out.println(String.valueOf((System.nanoTime()-msgBody.getTime())/1000000.0));
//
// if (ticks==0)
// {
// totalTime = System.currentTimeMillis();
// }
// totalTime += System.nanoTime()-msgBody.getTime();
// ticks++;
// if (ticks == 1000)
// {
// System.out.println(totalTime/1000000000.0f);
//
// totalTime = 0;
// ticks = 0;
// }
// nc_WriteRes.Builder builder = nc_WriteRes.newBuilder();
// builder.setKey(msgBody.getKey());
// builder.setValue(msgBody.getValue());
//
// NetMsg send = NetMsg.newMessage();
// send.setMessageLite(builder);
// send.setMsgID(EMSGID.nc_write_res);
//sendMsg(getRequestChannel(msgBody.getClientid()), send);
} }
break; break;
default: default:
System.err.println(msg.getMsgID().toString()); System.err.println(msg.getMsgID().toString());
break; break;
} }
//log.log(Level.INFO, msg.getMsgID().toString());
} }
public void addSession(MessageEvent e)
{ public void addSession(MessageEvent e) {
recvQueue.offer(e); recvQueue.offer(e);
} }
public boolean sendAllMsg(Integer hash, NetMsg msg)
{ public boolean sendAllMsg(Integer hash, NetMsg msg) {
for (int i = 0; i < MemcachedMgr.nCopyNode; i++) for (int i = 0; i < MemcachedMgr.nCopyNode; i++) {
{ Channel eChannel = getClientChannel(hash + i);
Channel eChannel = getClientChannel(hash+i); if (eChannel != null) {
if (eChannel != null)
{
sendMsg(eChannel, msg); sendMsg(eChannel, msg);
return false; return false;
} }
} }
return true; return true;
} }
public boolean randSendMsg2Memcached(Integer hash, NetMsg msg) public boolean allSendMsg2Memcached(NetMsg msg) {
{ int size = MemcachedChannelMap.size();
for (int i = 0; i < size; i++) {
Channel eChannel = getClientChannel(i);
if (eChannel != null) {
sendMsg(eChannel, msg);
}
}
return true;
}
public boolean randSendMsg2Memcached(Integer hash, NetMsg msg) {
Random random = new Random(); Random random = new Random();
int index = random.nextInt(MemcachedMgr.nCopyNode); int index = random.nextInt(MemcachedMgr.nCopyNode);
for (int i = 0; i < MemcachedMgr.nCopyNode; i++) for (int i = 0; i < MemcachedMgr.nCopyNode; i++) {
{ int num = (hash + i + index + MemcachedMgr.getInstance().getSize())
int num = (hash+i+index+MemcachedMgr.getInstance().getSize())%MemcachedMgr.getInstance().getSize(); % MemcachedMgr.getInstance().getSize();
Channel eChannel = getClientChannel(num); Channel eChannel = getClientChannel(num);
if (eChannel != null) if (eChannel != null) {
{
sendMsg(eChannel, msg); sendMsg(eChannel, msg);
return true; return true;
} }
} }
System.err.println("SendMsg wrong : randSendMsg2Memcached in webSession line 174");
System.exit(-1);
return false; return false;
} }
public boolean SendMsg2Leader(Integer hash, NetMsg msg) public boolean SendMsg2Leader(Integer hash, NetMsg msg) {
{ for (int i = 0; i < MemcachedMgr.nCopyNode; i++) {
for (int i = 0; i < MemcachedMgr.nCopyNode; i++) int index = (hash + i + MemcachedMgr.getInstance().getSize())
{ % MemcachedMgr.getInstance().getSize();
int index = (hash+i+MemcachedMgr.getInstance().getSize())
%MemcachedMgr.getInstance().getSize();
Channel eChannel = getClientChannel(index); Channel eChannel = getClientChannel(index);
if (eChannel != null) if (eChannel != null) {
{
sendMsg(eChannel, msg); sendMsg(eChannel, msg);
return true; return true;
} }
} }
return false; return false;
} }
public void sendMsg(Channel ch, NetMsg msg) public void sendMsg(Channel ch, NetMsg msg) {
{ try {
try
{
ch.write(msg); ch.write(msg);
} } catch (Throwable e) {
catch (Throwable e) // log.log(Level.WARNING, "send msg fail");
{
//log.log(Level.WARNING, "send msg fail");
} }
} }
public boolean stats() {
public boolean get(String key) nr_Stats.Builder builder = nr_Stats.newBuilder();
{ builder.setTime(System.nanoTime());
NetMsg msg = NetMsg.newMessage();
msg.setMessageLite(builder);
msg.setMsgID(EMSGID.nr_stats);
allSendMsg2Memcached(msg);
return true;
}
public boolean get(String key) {
nr_Read.Builder builder = nr_Read.newBuilder(); nr_Read.Builder builder = nr_Read.newBuilder();
builder.setKey(key); builder.setKey(key);
builder.setTime(System.nanoTime()); builder.setTime(System.nanoTime());
@ -317,9 +250,8 @@ public class webSession implements Runnable
randSendMsg2Memcached(key.hashCode(), msg); randSendMsg2Memcached(key.hashCode(), msg);
return true; return true;
} }
public boolean set(String key, String value) public boolean set(String key, String value) {
{
nr_write.Builder builder = nr_write.newBuilder(); nr_write.Builder builder = nr_write.newBuilder();
builder.setKey(key); builder.setKey(key);
builder.setValue(value); builder.setValue(value);