diff --git a/pom.xml b/pom.xml index c3e0ceb..e7338ef 100644 --- a/pom.xml +++ b/pom.xml @@ -314,5 +314,17 @@ mysql mysql-connector-java + + + io.netty + netty-all + 5.0.0.Alpha2 + + + javax.xml.bind + jaxb-api + 2.3.1 + + diff --git a/src/main/Resources/sys_config.properties b/src/main/Resources/sys_config.properties index 26be9d3..bd85117 100644 --- a/src/main/Resources/sys_config.properties +++ b/src/main/Resources/sys_config.properties @@ -2,6 +2,11 @@ client.verison=3.2 server.web.ip=localhost server.web.port=80 +#===============================netty配置===================================== +#配置netty.server.ip后,将默认使用netty模式,http模式将被禁用,host.name必须以nettyClient-开头 +netty.server.ip=127.0.0.1 +netty.server.port=7070 +host.name=nettyClient-lf #================================ 邮件======================================= #smtp邮件IP 例:smtp.qq.com mail.smtp.ip=smtp.qq.com diff --git a/src/main/java/luckyclient/netty/ClientHandler.java b/src/main/java/luckyclient/netty/ClientHandler.java new file mode 100644 index 0000000..3928b91 --- /dev/null +++ b/src/main/java/luckyclient/netty/ClientHandler.java @@ -0,0 +1,201 @@ +package luckyclient.netty; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.EventLoop; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import luckyclient.utils.config.SysConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.Resource; +import org.springframework.core.io.support.PropertiesLoaderUtils; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +public class ClientHandler extends ChannelInboundHandlerAdapter { + + //从application.properties文件中获取用到的参数; + private static Resource resource = new ClassPathResource("application.properties"); + private static Properties props; + + static { + try { + props = PropertiesLoaderUtils.loadProperties(resource); + } catch (IOException e) { + e.printStackTrace(); + } + } + + private static String port = props.getProperty("server.port"); + + private int counter; + + static final String ECHO_REQ = "$_"; + + private static final Logger log = LoggerFactory.getLogger(ClientHandler.class); + + private static final String HOST_NAME= SysConfig.getConfiguration().getProperty("host.name"); + + private static final String CLIENT_VERSION= SysConfig.getConfiguration().getProperty("client.verison"); + + private static final String SERVER_IP= SysConfig.getConfiguration().getProperty("server.web.ip"); + + private static final String SERVER_PORT= SysConfig.getConfiguration().getProperty("server.web.port"); + + private static ChannelHandlerContext ctx; + + public ClientHandler() throws IOException { + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException, InterruptedException { + //服务端消息处理,如果接收到测试任务方法,则直接产生一个http请求并发送请求到本地 + String jsonStr=new String(msg.toString().getBytes(),"UTF-8"); + JSONObject json=new JSONObject(); + try + { + json= JSON.parseObject(jsonStr); + + }catch (Exception e) + { + log.error("收到服务端非Json消息:"+jsonStr); + return; + } + log.info("收到服务端消息:"+json.toString()); + //解析消息 + if("run".equals(json.get("method"))){ + try { + //返回请求 + JSONObject re=new JSONObject(); + re.put("method","return"); + Result result=new Result(1,"同步等待消息返回",json.get("uuid").toString()); + //如果是调度请求,则发起一个HTTP请求 + //获取是get方法还是post方法 + String getOrPost=json.get("getOrPost").toString(); + String urlParam="http://127.0.0.1:"+port+"/"+json.get("url").toString(); + Integer socketTimeout=Integer.valueOf(json.get("socketTimeout").toString()); + String tmpResult=""; + if("get".equals(getOrPost)) + { + Map jsonparams = (Map)json.get("data"); + //get方法 + try { + tmpResult=HttpRequest.httpClientGet(urlParam,jsonparams,socketTimeout); + } catch (Exception e) { + log.error("转发服务端GET请求出错"); + } + } + else + { + String jsonparams=json.get("data").toString(); + //post方法 + try { + tmpResult=HttpRequest.httpClientPost(urlParam,jsonparams,socketTimeout); + } catch (Exception e) { + log.error("转发服务端POST请求出错"); + } + } + result.setMessage(tmpResult); + re.put("data",result); + sendMessage(re.toString()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + else if("download".equals(json.get("method"))){ + String loadpath=json.get("path").toString(); + String path = System.getProperty("user.dir")+loadpath; + String fileName=json.get("fileName").toString(); + //发出http请求下载文件 + try { + boolean isSuccess=HttpRequest.downLoadFromUrl("http://"+SERVER_IP+":"+SERVER_PORT+"/common/download?fileName="+fileName,fileName,path); + //返回请求 + JSONObject re=new JSONObject(); + Result result=new Result(1,"上传成功",json.get("uuid").toString()); + re.put("method","return"); + re.put("data",result); + sendMessage(re.toString()); + log.info("下载驱动包成功,路径为:"+path+";文件名为:"+fileName); + } catch (Exception e) { + e.printStackTrace(); + //返回请求 + JSONObject re=new JSONObject(); + Result result=new Result(0,"上传失败",json.get("uuid").toString()); + re.put("method","return"); + re.put("data",result); + sendMessage(re.toString()); + log.info("下载驱动包失败,路径为:"+path+";文件名为:"+fileName); + } + } + + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + //发送客户端登录消息 + JSONObject json=new JSONObject(); + json.put("hostName",HOST_NAME); + json.put("version",CLIENT_VERSION); + json.put("method","clientUp"); + ClientHandler.ctx=ctx; + sendMessage(json.toString()); + System.out.println("通道已连接,客户端登录消息已发送"); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) + { + System.out.println("连接已断开,正在尝试重连..."); + //使用过程中断线重连 + final EventLoop eventLoop = ctx.channel().eventLoop(); + eventLoop.schedule(new Runnable() { + @Override + public void run() { + try { + NettyClient.start(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }, 1, TimeUnit.SECONDS); + + ctx.fireChannelInactive(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) + throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent event = (IdleStateEvent) evt; + if (event.state().equals(IdleState.READER_IDLE)) { + /**发送心跳,保持长连接*/ + JSONObject json=new JSONObject(); + json.put("method","ping"); + json.put("hostName",HOST_NAME); + ctx.channel().writeAndFlush(json.toString()+"$_").sync(); + //log.info("心跳发送成功!"); + } + } + super.userEventTriggered(ctx, evt); + } + + + public static void sendMessage(String json) throws InterruptedException { + ctx.channel().writeAndFlush(Unpooled.copiedBuffer((json+"$_").getBytes())); + } +} \ No newline at end of file diff --git a/src/main/java/luckyclient/netty/HttpRequest.java b/src/main/java/luckyclient/netty/HttpRequest.java new file mode 100644 index 0000000..1061f87 --- /dev/null +++ b/src/main/java/luckyclient/netty/HttpRequest.java @@ -0,0 +1,315 @@ +package luckyclient.netty; + +import org.apache.http.HttpEntity; +import org.apache.http.NoHttpResponseException; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.conn.HttpHostConnectException; +import org.apache.http.entity.StringEntity; +import org.apache.http.entity.mime.MultipartEntityBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.net.ConnectException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLEncoder; +import java.nio.charset.Charset; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.util.Map; +import java.util.Map.Entry; + +public class HttpRequest { + private static final Logger log = LoggerFactory.getLogger(HttpRequest.class); + /** + * 使用HttpClient以JSON格式发送post请求 + * @param urlParam + * @param jsonparams + * @param socketTimeout + * @return + * @throws NoSuchAlgorithmException + * @throws KeyManagementException + * @throws UnsupportedEncodingException + * @throws IOException + * @author Seagull + * @date 2019年5月14日 + */ + public static String httpClientPost(String urlParam,String jsonparams,Integer socketTimeout) throws NoSuchAlgorithmException, KeyManagementException, UnsupportedEncodingException, IOException{ + StringBuffer resultBuffer = null; + CloseableHttpClient httpclient= HttpClients.createDefault(); + HttpPost httpPost = new HttpPost(urlParam); + RequestConfig requestConfig = RequestConfig.custom() + .setConnectTimeout(3000).setConnectionRequestTimeout(1500) + .setSocketTimeout(socketTimeout).build(); + httpPost.setConfig(requestConfig); + httpPost.setHeader("Content-Type", "application/json"); + // 构建请求参数 + BufferedReader br = null; + try { + if(null!=jsonparams&&jsonparams.length()>0){ + StringEntity entity = new StringEntity(jsonparams,"utf-8"); + httpPost.setEntity(entity); + } + + CloseableHttpResponse response = httpclient.execute(httpPost); + + // 读取服务器响应数据 + resultBuffer = new StringBuffer(); + if(null!=response.getEntity()){ + br = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), "utf-8")); + String temp; + while ((temp = br.readLine()) != null) { + resultBuffer.append(temp); + } + } + } catch (RuntimeException e) { + log.error("网络链接出现异常,请检查...",e); + throw new RuntimeException(e); + } catch (ConnectException e) { + log.error("客户端网络无法链接,请检查...",e); + throw new ConnectException(); + } finally { + if (br != null) { + try { + br.close(); + } catch (IOException e) { + br = null; + throw new RuntimeException(e); + } + } + } + return resultBuffer.toString(); + } + + /** + * 使用HttpClient以JSON格式发送get请求 + * @param urlParam + * @param params + * @param socketTimeout + * @return + * @throws NoSuchAlgorithmException + * @throws KeyManagementException + * @throws NoHttpResponseException + * @author Seagull + * @date 2019年5月14日 + */ + public static String httpClientGet(String urlParam, Map params,Integer socketTimeout) throws NoSuchAlgorithmException, KeyManagementException, NoHttpResponseException { + StringBuffer resultBuffer = null; + CloseableHttpClient httpclient= HttpClients.createDefault(); + BufferedReader br = null; + // 构建请求参数 + StringBuffer sbParams = new StringBuffer(); + if (params != null && params.size() > 0) { + for (Entry entry : params.entrySet()) { + sbParams.append(entry.getKey()); + sbParams.append("="); + try { + sbParams.append(URLEncoder.encode(String.valueOf(entry.getValue()), "utf-8")); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + sbParams.append("&"); + } + } + if (sbParams != null && sbParams.length() > 0) { + urlParam = urlParam + "?" + sbParams.substring(0, sbParams.length() - 1); + } + HttpGet httpGet = new HttpGet(urlParam); + RequestConfig requestConfig = RequestConfig.custom() + .setConnectTimeout(3000).setConnectionRequestTimeout(1500) + .setSocketTimeout(socketTimeout).build(); + httpGet.setConfig(requestConfig); + try { + CloseableHttpResponse response = httpclient.execute(httpGet); + + // 读取服务器响应数据 + br = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), "utf-8")); + String temp; + resultBuffer = new StringBuffer(); + while ((temp = br.readLine()) != null) { + resultBuffer.append(temp); + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + if (br != null) { + try { + br.close(); + } catch (IOException e) { + br = null; + throw new RuntimeException(e); + } + } + } + return resultBuffer.toString(); + } + + + /** + * 上传文件 + * @param urlParam + * @param loadpath + * @param file + * @return + * @throws NoSuchAlgorithmException + * @throws KeyManagementException + * @throws HttpHostConnectException + * @author Seagull + * @date 2019年3月15日 + */ + public static String httpClientUploadFile(String urlParam, String loadpath, File file) throws NoSuchAlgorithmException, KeyManagementException, HttpHostConnectException { + StringBuffer resultBuffer = null; + CloseableHttpClient httpclient= HttpClients.createDefault(); + HttpPost httpPost = new HttpPost(urlParam); + // 构建请求参数 + BufferedReader br = null; + try { + MultipartEntityBuilder entityBuilder = MultipartEntityBuilder.create(); + //设置请求的编码格式 + entityBuilder.setCharset(Charset.forName("utf-8")); + entityBuilder.addBinaryBody("jarfile", file); + entityBuilder.addTextBody("loadpath", loadpath); + HttpEntity reqEntity =entityBuilder.build(); + httpPost.setEntity(reqEntity); + + CloseableHttpResponse response = httpclient.execute(httpPost); + //从状态行中获取状态码 + String responsecode = String.valueOf(response.getStatusLine().getStatusCode()); + // 读取服务器响应数据 + resultBuffer = new StringBuffer(); + + br = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), "utf-8")); + String temp; + while ((temp = br.readLine()) != null) { + resultBuffer.append(temp); + } + if(resultBuffer.length()==0){ + resultBuffer.append("上传文件异常,响应码:"+responsecode); + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + if (br != null) { + try { + br.close(); + } catch (IOException e) { + br = null; + throw new RuntimeException(e); + } + } + } + return resultBuffer.toString(); + } + + /** + * 获取文件流 + * @param urlParam + * @param params + * @return + * @throws IOException + * @throws HttpHostConnectException + * @author Seagull + * @date 2019年3月15日 + */ + public static byte[] getFile(String urlParam, Map params) throws IOException, HttpHostConnectException { + // 构建请求参数 + StringBuffer sbParams = new StringBuffer(); + if (params != null && params.size() > 0) { + for (Entry entry : params.entrySet()) { + sbParams.append(entry.getKey()); + sbParams.append("="); + try { + sbParams.append(URLEncoder.encode(String.valueOf(entry.getValue()), "utf-8")); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + sbParams.append("&"); + } + } + if (sbParams != null && sbParams.length() > 0) { + urlParam = urlParam + "?" + sbParams.substring(0, sbParams.length() - 1); + } + URL urlConet = new URL(urlParam); + HttpURLConnection con = (HttpURLConnection)urlConet.openConnection(); + con.setRequestMethod("GET"); + con.setConnectTimeout(4 * 1000); + InputStream inStream = con .getInputStream(); //通过输入流获取图片数据 + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + byte[] buffer = new byte[2048]; + int len = 0; + while( (len=inStream.read(buffer)) != -1 ){ + outStream.write(buffer, 0, len); + } + inStream.close(); + byte[] data = outStream.toByteArray(); + return data; + } + + /** + * 从网络Url中下载文件 + * @param urlStr + * @param fileName + * @param savePath + * @throws IOException + */ + public static boolean downLoadFromUrl(String urlStr,String fileName,String savePath) throws IOException{ + try + { + URL url = new URL(urlStr); + HttpURLConnection conn = (HttpURLConnection)url.openConnection(); + //设置超时间为3秒 + conn.setConnectTimeout(3*1000); + //防止屏蔽程序抓取而返回403错误 + conn.setRequestProperty("User-Agent", "Mozilla/4.0 (compatible; MSIE 5.0; Windows NT; DigExt)"); + + //得到输入流 + InputStream inputStream = conn.getInputStream(); + //获取自己数组 + byte[] getData = readInputStream(inputStream); + + //文件保存位置 + File saveDir = new File(savePath); + if(!saveDir.exists()){ + saveDir.mkdir(); + } + File file = new File(saveDir+File.separator+fileName); + FileOutputStream fos = new FileOutputStream(file); + fos.write(getData); + if(fos!=null){ + fos.close(); + } + if(inputStream!=null){ + inputStream.close(); + } + return true; + }catch (Exception e){ + e.printStackTrace(); + return false; + } + } + + /** + * 从输入流中获取字节数组 + * @param inputStream + * @return + * @throws IOException + */ + public static byte[] readInputStream(InputStream inputStream) throws IOException { + byte[] buffer = new byte[1024]; + int len = 0; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + while((len = inputStream.read(buffer)) != -1) { + bos.write(buffer, 0, len); + } + bos.close(); + return bos.toByteArray(); + } + + +} diff --git a/src/main/java/luckyclient/netty/NettyClient.java b/src/main/java/luckyclient/netty/NettyClient.java new file mode 100644 index 0000000..a01799b --- /dev/null +++ b/src/main/java/luckyclient/netty/NettyClient.java @@ -0,0 +1,86 @@ +package luckyclient.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.handler.timeout.IdleStateHandler; +import luckyclient.utils.config.SysConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.concurrent.TimeUnit; + +public class NettyClient { + private static final String NETTY_SERVER_IP= SysConfig.getConfiguration().getProperty("netty.server.ip"); + + private static final int NETTY_SERVER_PORT=Integer.parseInt(SysConfig.getConfiguration().getProperty("netty.server.port")); + + private static Channel channel; + + private static ChannelFuture connect; + + private static final Logger log = LoggerFactory.getLogger(NettyClient.class); + + private static ClientHandler clientHandler; + public static void start() throws InterruptedException{ + EventLoopGroup group = new NioEventLoopGroup(); + try { + Bootstrap b = new Bootstrap(); + clientHandler=new ClientHandler(); + b.group(group) + .channel(NioSocketChannel.class) + .option(ChannelOption.SO_KEEPALIVE,true) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); + ChannelPipeline p = ch.pipeline(); + p.addLast("decoder", new StringDecoder()); + p.addLast("encoder", new StringEncoder()); + p.addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); + p.addLast(new IdleStateHandler(1,0,0,TimeUnit.SECONDS)); + p.addLast(clientHandler); + } + }); + //连接服务端 + connect = b.connect(NETTY_SERVER_IP, NETTY_SERVER_PORT); + //断线重连 + connect.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture channelFuture) throws Exception { + if (!channelFuture.isSuccess()) { + final EventLoop loop = channelFuture.channel().eventLoop(); + loop.schedule(new Runnable() { + @Override + public void run() { + try { + log.error("服务端链接不上,开始重连操作..."); + start(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }, 1L, TimeUnit.SECONDS); + } else { + channel = channelFuture.channel(); + log.info("服务端链接成功..."); + } + } + }); + } catch (IOException e) { + e.printStackTrace(); + } finally { + //不关闭通道 + //group.shutdownGracefully(); + } + } +} diff --git a/src/main/java/luckyclient/netty/Result.java b/src/main/java/luckyclient/netty/Result.java new file mode 100644 index 0000000..f7520bd --- /dev/null +++ b/src/main/java/luckyclient/netty/Result.java @@ -0,0 +1,39 @@ +package luckyclient.netty; + +import java.io.Serializable; + +public class Result implements Serializable { + private int code; + private Object message; + private String uniId; + + public Result(int code, Object message, String uniId) { + this.code = code; + this.message = message; + this.uniId = uniId; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + + public Object getMessage() { + return message; + } + + public void setMessage(Object message) { + this.message = message; + } + + public String getUniId() { + return uniId; + } + + public void setUniId(String uniId) { + this.uniId = uniId; + } +} diff --git a/src/main/java/springboot/RunService.java b/src/main/java/springboot/RunService.java index bab4513..9d7ce65 100644 --- a/src/main/java/springboot/RunService.java +++ b/src/main/java/springboot/RunService.java @@ -4,6 +4,8 @@ import java.io.File; import java.net.InetAddress; import java.net.UnknownHostException; +import luckyclient.netty.NettyClient; +import luckyclient.utils.config.SysConfig; import org.apache.log4j.PropertyConfigurator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,17 +25,26 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; public class RunService { private static final Logger log = LoggerFactory.getLogger(RunService.class); + private static final String NETTY_SERVER_IP= SysConfig.getConfiguration().getProperty("netty.server.ip"); public static void main(String[] args) { // TODO Auto-generated method stub PropertyConfigurator.configure(System.getProperty("user.dir") + File.separator +"bootlog4j.conf"); SpringApplication.run(RunService.class, args); - try { - String host = InetAddress.getLocalHost().getHostAddress(); - log.info("启动客户端监听,请稍后......监听IP:{}",host); - } catch (UnknownHostException e) { - log.error("获取服务IP出现异常......", e); - } - HttpImpl.checkHostNet(); + try { + String host = InetAddress.getLocalHost().getHostAddress(); + log.info("启动客户端监听,请稍后......监听IP:{}",host); + } catch (Exception e) { + log.error("获取服务IP出现异常......", e); + } + if(NETTY_SERVER_IP==null) + HttpImpl.checkHostNet(); + try { + log.error("##################客户端netty开启#################"); + NettyClient.start(); + } catch (Exception e) { + log.error("连接服务端netty异常......"); + e.printStackTrace(); + } } }