find node

This commit is contained in:
lyr90329 2014-08-13 16:49:03 +08:00
parent 170f8b3edf
commit b7cc9cee53
1 changed files with 245 additions and 0 deletions

View File

@ -0,0 +1,245 @@
package server;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import messageBody.memcachedmsg.nm_Connected;
import messageBody.memcachedmsg.nm_Connected_web_back;
import messageBody.requestMsg.nr_Connected_mem_back;
import messageBody.requestMsg.nr_Read;
import messageBody.requestMsg.nr_Read_res;
import messageBody.requestMsg.nr_write;
import messageBody.requestMsg.nr_write_res;
import org.apache.log4j.Logger;
import org.apache.log4j.Priority;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.util.internal.ConcurrentHashMap;
import database.DBMessage;
import r_memcached.MemcachedMgr;
import server.NetMsg;
import common.EMSGID;
public class webSession implements Runnable
{
ConcurrentLinkedQueue<MessageEvent> recvQueue = new ConcurrentLinkedQueue<MessageEvent>();
Map<Integer, Channel> MemcachedChannelMap = new ConcurrentHashMap<Integer, Channel>();
Map<Integer, Channel> RequestChannelMap = new ConcurrentHashMap<Integer, Channel>();
static webSession session = null;
public Channel curChannel;
public long totalTime = 0;
public long ticks =0;
//public static Logger log = LoggerUtil.getInstance();
public static Logger log = Logger.getLogger(webSession.class.getName());
public static webSession getInstance()
{
if (session == null)
{
session = new webSession();
}
return session;
}
public void start()
{
//DBSession.getInstance().start();
new Thread(session).start();
System.out.println("session start");
}
// Ìí¼ÓrequestÁ¬½Ó
public void addRequestChannel(Channel ch)
{
RequestChannelMap.put(ch.getId(), ch);
}
// ɾ³ýrequestÁ¬½Ó
public void removeRequestChannel(Channel ch)
{
RequestChannelMap.remove(ch.getId());
}
// »ñµÃrequestÁ¬½Ó
public Channel getRequestChannel(Integer id)
{
return RequestChannelMap.get(id);
}
// Ôö¼ÓclientÁ¬½Ó
public void addClientChannel(Integer num,Channel ch)
{
MemcachedChannelMap.put(num, ch);
}
public Channel getClientChannel(Integer id)
{
return MemcachedChannelMap.get(id);
}
// ɾµôclientÁ¬½Ó
@SuppressWarnings("rawtypes")
public void removeClientChannel(Channel ch)
{
Iterator iter = MemcachedChannelMap.entrySet().iterator();
while (iter.hasNext())
{
Entry entry = (Entry) iter.next();
if ((Channel)entry.getValue() == ch)
{
MemcachedChannelMap.remove((Integer)entry.getKey());
break;
}
}
}
//////////////////////////////////////////////////////////
public void run()
{
while(true)
{
MessageEvent event = recvQueue.poll();
while(event != null)
{
handle(event);
event = recvQueue.poll();
}
try
{
Thread.sleep(0);
} catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public int gethashMem(String key)
{
return Math.abs(key.hashCode()%MemcachedMgr.getInstance().getSize());
}
public void handle(MessageEvent e)
{
NetMsg msg = (NetMsg)e.getMessage();
switch (msg.getMsgID())
{
case nm_connected:
{
nm_Connected msgBody = msg.getMessageLite();
addClientChannel(msgBody.getNum(), e.getChannel());
nm_Connected_web_back.Builder builder = nm_Connected_web_back.newBuilder();
NetMsg send = NetMsg.newMessage();
send.setMessageLite(builder);
send.setMsgID(EMSGID.nm_connected_web_back);
sendMsg(e.getChannel(), send);
}
break;
case nr_connected_mem_back:
{
nr_Connected_mem_back msgLite= msg.getMessageLite();
addClientChannel(msgLite.getMemID(), e.getChannel());
}
break;
default:
System.err.println(msg.getMsgID().toString());
break;
}
//log.log(Level.INFO, msg.getMsgID().toString());
}
public void addSession(MessageEvent e)
{
recvQueue.offer(e);
}
public boolean sendAllMsg(Integer hash, NetMsg msg)
{
for (int i = 0; i < MemcachedMgr.nCopyNode; i++)
{
Channel eChannel = getClientChannel(hash+i);
if (eChannel != null)
{
sendMsg(eChannel, msg);
return false;
}
}
return true;
}
public boolean randSendMsg2Memcached(Integer hash, NetMsg msg)
{
Random random = new Random();
int index = random.nextInt(MemcachedMgr.nCopyNode);
for (int i = 0; i < MemcachedMgr.nCopyNode; i++)
{
int num = (hash+i+index+MemcachedMgr.getInstance().getSize())%MemcachedMgr.getInstance().getSize();
Channel eChannel = getClientChannel(num);
if (eChannel != null)
{
sendMsg(eChannel, msg);
return true;
}
}
return false;
}
public boolean SendMsg2Leader(Integer hash, NetMsg msg)
{
for (int i = 0; i < MemcachedMgr.nCopyNode; i++)
{
int index = (hash+i+MemcachedMgr.getInstance().getSize())
%MemcachedMgr.getInstance().getSize();
Channel eChannel = getClientChannel(index);
if (eChannel != null)
{
sendMsg(eChannel, msg);
return true;
}
}
return false;
}
public void sendMsg(Channel ch, NetMsg msg)
{
try
{
ch.write(msg);
}
catch (Throwable e)
{
//log.log(Level.WARNING, "send msg fail");
}
}
public boolean get(String key)
{
nr_Read.Builder builder = nr_Read.newBuilder();
builder.setKey(key);
builder.setTime(System.nanoTime());
NetMsg msg = NetMsg.newMessage();
msg.setMessageLite(builder);
msg.setMsgID(EMSGID.nr_read);
randSendMsg2Memcached(key.hashCode(), msg);
return true;
}
public boolean set(String key, String value)
{
nr_write.Builder builder = nr_write.newBuilder();
builder.setKey(key);
builder.setValue(value);
builder.setTime(System.nanoTime());
NetMsg msg = NetMsg.newMessage();
msg.setMessageLite(builder);
msg.setMsgID(EMSGID.nr_write);
SendMsg2Leader(key.hashCode(), msg);
return true;
}
}