增加服务端和客户端通讯方式

This commit is contained in:
fengjian 2020-02-25 11:53:49 +08:00
parent 5fcf70d790
commit ee382c4f33
7 changed files with 676 additions and 7 deletions

12
pom.xml
View File

@ -314,5 +314,17 @@
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!--netty-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.3.1</version>
</dependency>
</dependencies>
</project>

View File

@ -2,6 +2,11 @@
client.verison=3.2
server.web.ip=localhost
server.web.port=80
#===============================netty配置=====================================
#配置netty.server.ip后将默认使用netty模式http模式将被禁用,host.name必须以nettyClient-开头
netty.server.ip=127.0.0.1
netty.server.port=7070
host.name=nettyClient-lf
#================================ 邮件=======================================
#smtp邮件IP 例smtp.qq.com
mail.smtp.ip=smtp.qq.com

View File

@ -0,0 +1,201 @@
package luckyclient.netty;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import luckyclient.utils.config.SysConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PropertiesLoaderUtils;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class ClientHandler extends ChannelInboundHandlerAdapter {
//从application.properties文件中获取用到的参数;
private static Resource resource = new ClassPathResource("application.properties");
private static Properties props;
static {
try {
props = PropertiesLoaderUtils.loadProperties(resource);
} catch (IOException e) {
e.printStackTrace();
}
}
private static String port = props.getProperty("server.port");
private int counter;
static final String ECHO_REQ = "$_";
private static final Logger log = LoggerFactory.getLogger(ClientHandler.class);
private static final String HOST_NAME= SysConfig.getConfiguration().getProperty("host.name");
private static final String CLIENT_VERSION= SysConfig.getConfiguration().getProperty("client.verison");
private static final String SERVER_IP= SysConfig.getConfiguration().getProperty("server.web.ip");
private static final String SERVER_PORT= SysConfig.getConfiguration().getProperty("server.web.port");
private static ChannelHandlerContext ctx;
public ClientHandler() throws IOException {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException, InterruptedException {
//服务端消息处理,如果接收到测试任务方法则直接产生一个http请求并发送请求到本地
String jsonStr=new String(msg.toString().getBytes(),"UTF-8");
JSONObject json=new JSONObject();
try
{
json= JSON.parseObject(jsonStr);
}catch (Exception e)
{
log.error("收到服务端非Json消息"+jsonStr);
return;
}
log.info("收到服务端消息:"+json.toString());
//解析消息
if("run".equals(json.get("method"))){
try {
//返回请求
JSONObject re=new JSONObject();
re.put("method","return");
Result result=new Result(1,"同步等待消息返回",json.get("uuid").toString());
//如果是调度请求则发起一个HTTP请求
//获取是get方法还是post方法
String getOrPost=json.get("getOrPost").toString();
String urlParam="http://127.0.0.1:"+port+"/"+json.get("url").toString();
Integer socketTimeout=Integer.valueOf(json.get("socketTimeout").toString());
String tmpResult="";
if("get".equals(getOrPost))
{
Map<String,Object> jsonparams = (Map<String,Object>)json.get("data");
//get方法
try {
tmpResult=HttpRequest.httpClientGet(urlParam,jsonparams,socketTimeout);
} catch (Exception e) {
log.error("转发服务端GET请求出错");
}
}
else
{
String jsonparams=json.get("data").toString();
//post方法
try {
tmpResult=HttpRequest.httpClientPost(urlParam,jsonparams,socketTimeout);
} catch (Exception e) {
log.error("转发服务端POST请求出错");
}
}
result.setMessage(tmpResult);
re.put("data",result);
sendMessage(re.toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
else if("download".equals(json.get("method"))){
String loadpath=json.get("path").toString();
String path = System.getProperty("user.dir")+loadpath;
String fileName=json.get("fileName").toString();
//发出http请求下载文件
try {
boolean isSuccess=HttpRequest.downLoadFromUrl("http://"+SERVER_IP+":"+SERVER_PORT+"/common/download?fileName="+fileName,fileName,path);
//返回请求
JSONObject re=new JSONObject();
Result result=new Result(1,"上传成功",json.get("uuid").toString());
re.put("method","return");
re.put("data",result);
sendMessage(re.toString());
log.info("下载驱动包成功,路径为:"+path+";文件名为:"+fileName);
} catch (Exception e) {
e.printStackTrace();
//返回请求
JSONObject re=new JSONObject();
Result result=new Result(0,"上传失败",json.get("uuid").toString());
re.put("method","return");
re.put("data",result);
sendMessage(re.toString());
log.info("下载驱动包失败,路径为:"+path+";文件名为:"+fileName);
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发送客户端登录消息
JSONObject json=new JSONObject();
json.put("hostName",HOST_NAME);
json.put("version",CLIENT_VERSION);
json.put("method","clientUp");
ClientHandler.ctx=ctx;
sendMessage(json.toString());
System.out.println("通道已连接,客户端登录消息已发送");
}
@Override
public void channelInactive(ChannelHandlerContext ctx)
{
System.out.println("连接已断开,正在尝试重连...");
//使用过程中断线重连
final EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(new Runnable() {
@Override
public void run() {
try {
NettyClient.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 1, TimeUnit.SECONDS);
ctx.fireChannelInactive();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.READER_IDLE)) {
/**发送心跳,保持长连接*/
JSONObject json=new JSONObject();
json.put("method","ping");
json.put("hostName",HOST_NAME);
ctx.channel().writeAndFlush(json.toString()+"$_").sync();
//log.info("心跳发送成功!");
}
}
super.userEventTriggered(ctx, evt);
}
public static void sendMessage(String json) throws InterruptedException {
ctx.channel().writeAndFlush(Unpooled.copiedBuffer((json+"$_").getBytes()));
}
}

View File

@ -0,0 +1,315 @@
package luckyclient.netty;
import org.apache.http.HttpEntity;
import org.apache.http.NoHttpResponseException;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.entity.StringEntity;
import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
import java.util.Map.Entry;
public class HttpRequest {
private static final Logger log = LoggerFactory.getLogger(HttpRequest.class);
/**
* 使用HttpClient以JSON格式发送post请求
* @param urlParam
* @param jsonparams
* @param socketTimeout
* @return
* @throws NoSuchAlgorithmException
* @throws KeyManagementException
* @throws UnsupportedEncodingException
* @throws IOException
* @author Seagull
* @date 2019年5月14日
*/
public static String httpClientPost(String urlParam,String jsonparams,Integer socketTimeout) throws NoSuchAlgorithmException, KeyManagementException, UnsupportedEncodingException, IOException{
StringBuffer resultBuffer = null;
CloseableHttpClient httpclient= HttpClients.createDefault();
HttpPost httpPost = new HttpPost(urlParam);
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(3000).setConnectionRequestTimeout(1500)
.setSocketTimeout(socketTimeout).build();
httpPost.setConfig(requestConfig);
httpPost.setHeader("Content-Type", "application/json");
// 构建请求参数
BufferedReader br = null;
try {
if(null!=jsonparams&&jsonparams.length()>0){
StringEntity entity = new StringEntity(jsonparams,"utf-8");
httpPost.setEntity(entity);
}
CloseableHttpResponse response = httpclient.execute(httpPost);
// 读取服务器响应数据
resultBuffer = new StringBuffer();
if(null!=response.getEntity()){
br = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), "utf-8"));
String temp;
while ((temp = br.readLine()) != null) {
resultBuffer.append(temp);
}
}
} catch (RuntimeException e) {
log.error("网络链接出现异常,请检查...",e);
throw new RuntimeException(e);
} catch (ConnectException e) {
log.error("客户端网络无法链接,请检查...",e);
throw new ConnectException();
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
br = null;
throw new RuntimeException(e);
}
}
}
return resultBuffer.toString();
}
/**
* 使用HttpClient以JSON格式发送get请求
* @param urlParam
* @param params
* @param socketTimeout
* @return
* @throws NoSuchAlgorithmException
* @throws KeyManagementException
* @throws NoHttpResponseException
* @author Seagull
* @date 2019年5月14日
*/
public static String httpClientGet(String urlParam, Map<String, Object> params,Integer socketTimeout) throws NoSuchAlgorithmException, KeyManagementException, NoHttpResponseException {
StringBuffer resultBuffer = null;
CloseableHttpClient httpclient= HttpClients.createDefault();
BufferedReader br = null;
// 构建请求参数
StringBuffer sbParams = new StringBuffer();
if (params != null && params.size() > 0) {
for (Entry<String, Object> entry : params.entrySet()) {
sbParams.append(entry.getKey());
sbParams.append("=");
try {
sbParams.append(URLEncoder.encode(String.valueOf(entry.getValue()), "utf-8"));
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
sbParams.append("&");
}
}
if (sbParams != null && sbParams.length() > 0) {
urlParam = urlParam + "?" + sbParams.substring(0, sbParams.length() - 1);
}
HttpGet httpGet = new HttpGet(urlParam);
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(3000).setConnectionRequestTimeout(1500)
.setSocketTimeout(socketTimeout).build();
httpGet.setConfig(requestConfig);
try {
CloseableHttpResponse response = httpclient.execute(httpGet);
// 读取服务器响应数据
br = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), "utf-8"));
String temp;
resultBuffer = new StringBuffer();
while ((temp = br.readLine()) != null) {
resultBuffer.append(temp);
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
br = null;
throw new RuntimeException(e);
}
}
}
return resultBuffer.toString();
}
/**
* 上传文件
* @param urlParam
* @param loadpath
* @param file
* @return
* @throws NoSuchAlgorithmException
* @throws KeyManagementException
* @throws HttpHostConnectException
* @author Seagull
* @date 2019年3月15日
*/
public static String httpClientUploadFile(String urlParam, String loadpath, File file) throws NoSuchAlgorithmException, KeyManagementException, HttpHostConnectException {
StringBuffer resultBuffer = null;
CloseableHttpClient httpclient= HttpClients.createDefault();
HttpPost httpPost = new HttpPost(urlParam);
// 构建请求参数
BufferedReader br = null;
try {
MultipartEntityBuilder entityBuilder = MultipartEntityBuilder.create();
//设置请求的编码格式
entityBuilder.setCharset(Charset.forName("utf-8"));
entityBuilder.addBinaryBody("jarfile", file);
entityBuilder.addTextBody("loadpath", loadpath);
HttpEntity reqEntity =entityBuilder.build();
httpPost.setEntity(reqEntity);
CloseableHttpResponse response = httpclient.execute(httpPost);
//从状态行中获取状态码
String responsecode = String.valueOf(response.getStatusLine().getStatusCode());
// 读取服务器响应数据
resultBuffer = new StringBuffer();
br = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), "utf-8"));
String temp;
while ((temp = br.readLine()) != null) {
resultBuffer.append(temp);
}
if(resultBuffer.length()==0){
resultBuffer.append("上传文件异常,响应码:"+responsecode);
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
br = null;
throw new RuntimeException(e);
}
}
}
return resultBuffer.toString();
}
/**
* 获取文件流
* @param urlParam
* @param params
* @return
* @throws IOException
* @throws HttpHostConnectException
* @author Seagull
* @date 2019年3月15日
*/
public static byte[] getFile(String urlParam, Map<String, Object> params) throws IOException, HttpHostConnectException {
// 构建请求参数
StringBuffer sbParams = new StringBuffer();
if (params != null && params.size() > 0) {
for (Entry<String, Object> entry : params.entrySet()) {
sbParams.append(entry.getKey());
sbParams.append("=");
try {
sbParams.append(URLEncoder.encode(String.valueOf(entry.getValue()), "utf-8"));
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
sbParams.append("&");
}
}
if (sbParams != null && sbParams.length() > 0) {
urlParam = urlParam + "?" + sbParams.substring(0, sbParams.length() - 1);
}
URL urlConet = new URL(urlParam);
HttpURLConnection con = (HttpURLConnection)urlConet.openConnection();
con.setRequestMethod("GET");
con.setConnectTimeout(4 * 1000);
InputStream inStream = con .getInputStream(); //通过输入流获取图片数据
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
byte[] buffer = new byte[2048];
int len = 0;
while( (len=inStream.read(buffer)) != -1 ){
outStream.write(buffer, 0, len);
}
inStream.close();
byte[] data = outStream.toByteArray();
return data;
}
/**
* 从网络Url中下载文件
* @param urlStr
* @param fileName
* @param savePath
* @throws IOException
*/
public static boolean downLoadFromUrl(String urlStr,String fileName,String savePath) throws IOException{
try
{
URL url = new URL(urlStr);
HttpURLConnection conn = (HttpURLConnection)url.openConnection();
//设置超时间为3秒
conn.setConnectTimeout(3*1000);
//防止屏蔽程序抓取而返回403错误
conn.setRequestProperty("User-Agent", "Mozilla/4.0 (compatible; MSIE 5.0; Windows NT; DigExt)");
//得到输入流
InputStream inputStream = conn.getInputStream();
//获取自己数组
byte[] getData = readInputStream(inputStream);
//文件保存位置
File saveDir = new File(savePath);
if(!saveDir.exists()){
saveDir.mkdir();
}
File file = new File(saveDir+File.separator+fileName);
FileOutputStream fos = new FileOutputStream(file);
fos.write(getData);
if(fos!=null){
fos.close();
}
if(inputStream!=null){
inputStream.close();
}
return true;
}catch (Exception e){
e.printStackTrace();
return false;
}
}
/**
* 从输入流中获取字节数组
* @param inputStream
* @return
* @throws IOException
*/
public static byte[] readInputStream(InputStream inputStream) throws IOException {
byte[] buffer = new byte[1024];
int len = 0;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
while((len = inputStream.read(buffer)) != -1) {
bos.write(buffer, 0, len);
}
bos.close();
return bos.toByteArray();
}
}

View File

@ -0,0 +1,86 @@
package luckyclient.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import luckyclient.utils.config.SysConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
public class NettyClient {
private static final String NETTY_SERVER_IP= SysConfig.getConfiguration().getProperty("netty.server.ip");
private static final int NETTY_SERVER_PORT=Integer.parseInt(SysConfig.getConfiguration().getProperty("netty.server.port"));
private static Channel channel;
private static ChannelFuture connect;
private static final Logger log = LoggerFactory.getLogger(NettyClient.class);
private static ClientHandler clientHandler;
public static void start() throws InterruptedException{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
clientHandler=new ClientHandler();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
ChannelPipeline p = ch.pipeline();
p.addLast("decoder", new StringDecoder());
p.addLast("encoder", new StringEncoder());
p.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
p.addLast(new IdleStateHandler(1,0,0,TimeUnit.SECONDS));
p.addLast(clientHandler);
}
});
//连接服务端
connect = b.connect(NETTY_SERVER_IP, NETTY_SERVER_PORT);
//断线重连
connect.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (!channelFuture.isSuccess()) {
final EventLoop loop = channelFuture.channel().eventLoop();
loop.schedule(new Runnable() {
@Override
public void run() {
try {
log.error("服务端链接不上,开始重连操作...");
start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 1L, TimeUnit.SECONDS);
} else {
channel = channelFuture.channel();
log.info("服务端链接成功...");
}
}
});
} catch (IOException e) {
e.printStackTrace();
} finally {
//不关闭通道
//group.shutdownGracefully();
}
}
}

View File

@ -0,0 +1,39 @@
package luckyclient.netty;
import java.io.Serializable;
public class Result implements Serializable {
private int code;
private Object message;
private String uniId;
public Result(int code, Object message, String uniId) {
this.code = code;
this.message = message;
this.uniId = uniId;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public Object getMessage() {
return message;
}
public void setMessage(Object message) {
this.message = message;
}
public String getUniId() {
return uniId;
}
public void setUniId(String uniId) {
this.uniId = uniId;
}
}

View File

@ -4,6 +4,8 @@ import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import luckyclient.netty.NettyClient;
import luckyclient.utils.config.SysConfig;
import org.apache.log4j.PropertyConfigurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -23,17 +25,26 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
public class RunService {
private static final Logger log = LoggerFactory.getLogger(RunService.class);
private static final String NETTY_SERVER_IP= SysConfig.getConfiguration().getProperty("netty.server.ip");
public static void main(String[] args) {
// TODO Auto-generated method stub
PropertyConfigurator.configure(System.getProperty("user.dir") + File.separator +"bootlog4j.conf");
SpringApplication.run(RunService.class, args);
try {
String host = InetAddress.getLocalHost().getHostAddress();
log.info("启动客户端监听,请稍后......监听IP:{}",host);
} catch (UnknownHostException e) {
log.error("获取服务IP出现异常......", e);
}
HttpImpl.checkHostNet();
try {
String host = InetAddress.getLocalHost().getHostAddress();
log.info("启动客户端监听,请稍后......监听IP:{}",host);
} catch (Exception e) {
log.error("获取服务IP出现异常......", e);
}
if(NETTY_SERVER_IP==null)
HttpImpl.checkHostNet();
try {
log.error("##################客户端netty开启#################");
NettyClient.start();
} catch (Exception e) {
log.error("连接服务端netty异常......");
e.printStackTrace();
}
}
}