之前发了一篇"TripleDes的加解密Java、C#、php通用代码",后面又有项目用到了Rsa加解密,还是在不同系统之间进行交互,Rsa在不同语言的密钥格式不一样,所以过程中主要还是密钥转换问题,为方便密钥转换,写了一个XML和PEM格式的密钥转换工具,文章后面会提供密钥转换工具的下载地址,通过搜索参考和研究终于搞定了在Java、C#和Php都可以通用的加解密代码,整理了Rsa的加解密代码做个记录,以后可以参考,大家应该都知道Rsa算法,这里就不说明了,直接看代码(代码主要是公钥加密私钥解密,密文统一进行base64编码,密钥长度为2048): Java版本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
package com.w3cnet.leetcode.utils; import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.security.*; import java.security.interfaces.RSAPrivateKey; import java.security.interfaces.RSAPublicKey; import java.security.spec.PKCS8EncodedKeySpec; import java.security.spec.X509EncodedKeySpec; import java.util.HashMap; import java.util.Map; import javax.crypto.Cipher; import org.apache.commons.codec.binary.Base64; public class RsaUtil { /** * 加密算法 */ public static final String ALGORITHM = "RSA"; /** * 填充方式 */ public static final String PADDING_FORMAT = "RSA/ECB/PKCS1Padding"; /** * 明文块最大值 */ private static final int MAX_ENCRYPT_BLOCK = 245; /** * 密文块最大值 */ private static final int MAX_DECRYPT_BLOCK = 256; /** * 加密 * @param text 明文 * @param publicKey 公钥 * @param charset 字符编码 * @return 加密后的字符串(base64) * @throws Exception */ public static String encrypt(String text, String publicKey, String charset) throws Exception { byte[] data = text.getBytes(charset); byte[] keyBytes = Base64.decodeBase64(publicKey); X509EncodedKeySpec x509KeySpec = new X509EncodedKeySpec(keyBytes); KeyFactory keyFactory = KeyFactory.getInstance(ALGORITHM); Key publicK = keyFactory.generatePublic(x509KeySpec); // 对数据加密 Cipher cipher = Cipher.getInstance(PADDING_FORMAT); cipher.init(Cipher.ENCRYPT_MODE, publicK); int length = data.length; ByteArrayOutputStream out = new ByteArrayOutputStream(); int offSet = 0; byte[] cache; int i = 0; // 对数据分段加密 while (length - offSet > 0) { int len = length - offSet > MAX_ENCRYPT_BLOCK ? MAX_ENCRYPT_BLOCK : length - offSet; cache = cipher.doFinal(data, offSet, len); out.write(cache, 0, cache.length); i++; offSet = i * MAX_ENCRYPT_BLOCK; } byte[] cipherBytes = out.toByteArray(); out.close(); return Base64.encodeBase64String(cipherBytes); } /** * 私钥解密 * @param ciphertext 密文(base64) * @param privateKey 私钥 * @param charset 字符编码 * @return * @throws Exception */ public static String decrypt(String ciphertext, String privateKey, String charset) throws Exception { byte[] data = Base64.decodeBase64(ciphertext); byte[] keyBytes = Base64.decodeBase64(privateKey); PKCS8EncodedKeySpec pkcs8KeySpec = new PKCS8EncodedKeySpec(keyBytes); KeyFactory keyFactory = KeyFactory.getInstance(ALGORITHM); Key privateK = keyFactory.generatePrivate(pkcs8KeySpec); Cipher cipher = Cipher.getInstance(PADDING_FORMAT); cipher.init(Cipher.DECRYPT_MODE, privateK); int length = data.length; ByteArrayOutputStream out = new ByteArrayOutputStream(); int offSet = 0; byte[] cache; int i = 0; while (length - offSet > 0) { int len = length - offSet > MAX_DECRYPT_BLOCK ? MAX_DECRYPT_BLOCK : length - offSet; cache = cipher.doFinal(data, offSet, len); out.write(cache, 0, cache.length); i++; offSet = i * MAX_DECRYPT_BLOCK; } byte[] textBytes = out.toByteArray(); out.close(); return new String(textBytes, charset); } /** * 从文件中加载密钥字符串 * @return 是否成功 * @throws Exception */ public static String loadKeyString(String keyFile){ String keyString=""; InputStream in=null; BufferedReader br=null; try { in=RSACryptoUtil.class.getResourceAsStream("/"+keyFile); br= new BufferedReader(new InputStreamReader(in)); String readLine= null; StringBuilder sb= new StringBuilder(); while((readLine= br.readLine())!=null){ if(readLine.charAt(0)=='-'){ continue; }else{ sb.append(readLine); sb.append('\r'); } } keyString=sb.toString(); } catch (IOException e) { } catch (Exception e) { }finally{ if(br!=null){ try { br.close(); } catch (IOException e) { } } if(in!=null){ try { in.close(); } catch (IOException e) { } } } return keyString; } /** * 从文件中加载密钥字符串 根据文件路径加载 * @return 是否成功 * @throws Exception */ public static String loadKeyStringByPath(String keyFile){ String keyString=""; InputStream in=null; BufferedReader br=null; try { in = new FileInputStream(keyFile); br= new BufferedReader(new InputStreamReader(in)); String readLine= null; StringBuilder sb= new StringBuilder(); while((readLine= br.readLine())!=null){ if(readLine.charAt(0)=='-'){ continue; }else{ sb.append(readLine); sb.append('\r'); } } keyString=sb.toString(); } catch (IOException e) { System.out.println(e.getMessage()); } catch (Exception e) { System.out.println(e.getMessage()); }finally{ if(br!=null){ try { br.close(); } catch (IOException e) { } } if(in!=null){ try { in.close(); } catch (IOException e) { } } } return keyString; } public static void main(String[] args) throws Exception { String publicKey = RSAUtils.loadKeyStringByPath("D:/sso_public_key_test.pem"); System.out.println(RSACryptoUtil.encrypt("123", publicKey, "utf-8")); } } |
C#版本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
public class RsaUtil { /// <summary> /// 加密 /// </summary> /// <param name="text">明文</param> /// <param name="publicKey">公钥</param> /// <returns>密文</returns> public static string Encrypt(string text, string publicKey) { var bytes = Encoding.UTF8.GetBytes(text); var length = bytes.Length; // rsa实例 var rsa = new RSACryptoServiceProvider(); rsa.FromXmlString(publicKey); var blockSize = (rsa.KeySize / 8) - 11; var offSet = 0; // 游标 var i = 0; byte[] cache; var ms = new MemoryStream(); while (length - offSet > 0) { var len = length - offSet > blockSize ? blockSize : length - offSet; var temp = new byte[len]; Array.Copy(bytes, offSet, temp, 0, len); cache = rsa.Encrypt(temp, false); ms.Write(cache, 0, cache.Length); i++; offSet = i * blockSize; } var cipherBytes = ms.ToArray(); return Convert.ToBase64String(cipherBytes); } /// <summary> /// RSA解密 /// </summary> /// <param name="ciphertext">密文</param> /// <param name="privateKey">私钥</param> /// <returns>明文</returns> public static string Decrypt(string ciphertext, string privateKey) { var blockSize = 256; var bytes = Convert.FromBase64String(ciphertext); var length = bytes.Length; // rsa实例 var rsa = new RSACryptoServiceProvider(); rsa.FromXmlString(privateKey); var offSet = 0; // 游标 var i = 0; byte[] cache; var ms = new MemoryStream(); while (length - offSet > 0) { var len = length - offSet > blockSize ? blockSize : length - offSet; var temp = new byte[len]; Array.Copy(bytes, offSet, temp, 0, len); cache = rsa.Decrypt(temp, false); ms.Write(cache, 0, cache.Length); i++; offSet = i * blockSize; } var cipherBytes = ms.ToArray(); return Encoding.UTF8.GetString(cipherBytes); } } |
PHP版本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
<?php #私钥 $private_key = '-----BEGIN PRIVATE KEY----- MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQCocmvQIWD2L9TW N1uKpLONwHcdy2oaKoW1CtI8PB+S+UjdEuLl3usWEuh3ZgLYloPyD9553SJFn7an fOplHITBqASOIXt9pi0CkahUPgwCPj4Dke5cT3fsp2i243/FX3afzrcf7FfTrEv/ LT22wF3csqrGt/UhqMcmpWIpPaL8edoCwGRBhghzEWaRwPZ2x5RPg+9qTSy0M7QG u+8EwYDnNkGGuuIwzNxklclic9Unp96eVp33vOP/LNvz90OcuqGQQcABh+e2ttv6 VzqlRcD1GRhxqkT3erXmgbQAVm6JQHArK1Up10No+lth1tri5TGcL0BEF7PetDIj jYh+aCLpAgMBAAECggEAPjFI1yaLyzmrxo/Xz5+x36NxF2IUQabziP1+09iK+9Po cB9aAO9GMvc2N2dFo7wm6Uesp6fa0IQAh2RakoxuA6ZKUEPSeXjSY4Ft+fSSsH1U njLSI+j/aTQCOIxUj4YIoUZMXJABeVjDEmscvw3VWffpj8c5zXyoUv9696kXNUoa uHOuphe1UNc3ghFIpWK+7ScxkPB6KtmAFwD4LSbT9OwhPozavBqouQvhLt1bXO8C ycKq4f1Pzhtt3sq3BRsPwqGhWt1NK95upKQuDnk1kJHjNzScH1FZxhsBHLxwOJ6E M7HP5Ywh78umusaNPVLMM6+YVWbENg+WZZcXd/XspQKBgQDRIhIukY7m7fRBH0CP mHi20gUmn7HzXgLxlbcU5n2PMDHHOg8D+Nmz1MnYPRQNwqxJdH8HOk8YQDC2uA+h BpyCpvPbp6y6SnZw8bhqiNfSHBjEftmg7lUU5xzBiBo6SyCyLkWD2xa/O8Vyh6ut /G1mQpNNJWWt5b+g5c4i0/fIhwKBgQDOMjGcLS8zHnZ30ZsWUS39iZ5TZXPCldTl Q9JB5PnB4CajYmQEdw3n9C6mOO42b+3M04mmqo1r0HiNyA9lZQQNsM3KqG+ngGvE 0DM1cAyuz3Fi3XTIhAbRyXXyVbqBp5Yh2/O2lvrsInusJrimDRgwglquu2GsdVrj +++b2DFFDwKBgQCkfXLlk/FdK44xZn5mM1vHGBubDIJv0+Lm14Yf90aMyDBu7fh/ fEznSBfWb/wE8riGMg3zxmYNwfdO0Cji04torB4kB5cxE35jSYxupuFxzk2gx9Eu 5iafgUQ56G4QqaS24PQmSL10fnPHqHRdLa1ygCzRwfdettVpnTbsZ+J9owKBgQDF o2Tb3o9sPxmsdVNiy8L6TstcAlU3wOfELQK+uFwQ0eoXFvrpMLg6iVmhZ9YkhZp4 hpZdEwLkwXib5ZOkS3PcL4jBZDtJYRVrG2jKIrF1aU60RbJnc+0ZbjHIaxWOqvSD VdE/RW4TomXKN38rYke6T2feLatMY1wQRG6BgXKQTwKBgQDKfK9Xuqzx+WI4AohT EwKrQEZUMD+6DTESHHG9As4zMJ9zU+6iPpM4Gw4CzfJZHhPP4dD+TC9NcvvR+GC9 UkwxmZrvP1+6KNFp0DZNk6M9wPZuYM/E63Vy0sFEA+/gFp4vQh8k7N8n/n7DhR3v kLuPdicfsKcz0thxzyDjv6oR2g== -----END PRIVATE KEY-----'; #公钥 $public_key = '-----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqHJr0CFg9i/U1jdbiqSz jcB3HctqGiqFtQrSPDwfkvlI3RLi5d7rFhLod2YC2JaD8g/eed0iRZ+2p3zqZRyE wagEjiF7faYtApGoVD4MAj4+A5HuXE937KdotuN/xV92n863H+xX06xL/y09tsBd 3LKqxrf1IajHJqViKT2i/HnaAsBkQYYIcxFmkcD2dseUT4Pvak0stDO0BrvvBMGA 5zZBhrriMMzcZJXJYnPVJ6fenlad97zj/yzb8/dDnLqhkEHAAYfntrbb+lc6pUXA 9RkYcapE93q15oG0AFZuiUBwKytVKddDaPpbYdba4uUxnC9ARBez3rQyI42Ifmgi 6QIDAQAB -----END PUBLIC KEY-----'; $pi_key = openssl_pkey_get_private($private_key); $pu_key = openssl_pkey_get_public($public_key); #公钥加密数据 rsa+base64 $data = "hello world!"; openssl_public_encrypt($data,$encrypted,$pu_key); $base64_encrypted = base64_encode($encrypted); echo $base64_encrypted."\n"; #私钥解密数据 openssl_private_decrypt(base64_decode($base64_encrypted),$decrypted_data,$pi_key); echo $decrypted_data; ?> |
Rsa加解密C#和Java、php使用的密钥格式不一样,这里提供一个密钥转换工具:Rsa密钥转换工具v2.0 参考 https://www.cnblogs.com/jaamy/p/6118814.html
View Details一个类可以实现多个接口,但却只能继承最多一个抽象类。 抽象类可以包含具体的方法;接口的所有方法都是抽象的。 抽象类可以声明和使用字段;接口则不能,但可以创建静态的final常量。 抽象类中的方法可以是public、protected、private或者默认的pagckage;接口的方法都是public。 抽象类可以定义构造函数;接口不能。
View Details
1 2 3 4 5 6 7 8 |
@Test public void testHello() throws Exception { mockMvc.perform(MockMvcRequestBuilders.get("/hello").accept(MediaType.APPLICATION_JSON)) .andExpect(status().isOk()) .andExpect(content().string(equalTo("Hello World!"))); } |
是因为静态导入的原因,只要导入:
1 2 3 |
import static org.hamcrest.Matchers.equalTo; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; |
然后就OK了。 from:https://blog.csdn.net/qq_21549989/article/details/78873229
View Detailshttp://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd"> <!-- 把后面的3.x去掉 --> ——————— 作者:大道至简_lyon 来源:CSDN 原文:https://blog.csdn.net/a5601564/article/details/51585232 版权声明:本文为博主原创文章,转载请附上博文链接!
View Details服务治理可以说是微服务架构中最为核心和基础的模块,它主要用来实现各个微服务实例的自动化注册和发现。 Spring Cloud Eureka是Spring Cloud Netflix 微服务套件的一部分,主要负责完成微服务架构中的服务治理功能。 本文通过简单的小例子来分享下如何通过Eureka进行服务治理: 搭建服务注册中心 注册服务提供者 服务发现和消费 ==========我是华丽的分割线======================== 一、搭建服务注册中心 先列出完整目录结构: 搭建过程如下: 创建maven工程:eureka(具体实现略) 修改pom文件,引入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.sam</groupId> <artifactId>eureka</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.1.RELEASE</version> </parent> <properties> <javaVersion>1.8</javaVersion> </properties> <!-- 使用dependencyManagement进行版本管理 --> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Camden.SR6</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <!-- 引入eureka server依赖 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-eureka-server</artifactId> </dependency> </dependencies> </project> |
创建启动类
1 2 3 4 5 6 7 8 9 10 11 12 13 |
/** * * @EnableEurekaServer * 用来指定该项目为Eureka的服务注册中心 */ @EnableEurekaServer @SpringBootApplication public class EurekaApp { public static void main(String[] args) { SpringApplication.run(EurekaApp.class, args); } } |
配置application.properties文件
1 2 3 4 5 6 7 8 9 10 11 12 |
#设置tomcat服务端口号 server.port=1111 #设置服务名称 spring.application.name=eureka-service eureka.instance.hostname=localhost #注册中心不需要注册自己 eureka.client.register-with-eureka=false #注册中心不需要去发现服务 eureka.client.fetch-registry=false #设置服务注册中心的URL eureka.client.serviceUrl.defaultZone=http://${eureka.instance.hostname}:${server.port}/eureka |
启动服务并访问,我们会看到这样的画面: 二、注册服务提供者 先列出完整目录结构: 搭建过程如下: 创建maven工程:hello-service(具体实现略) 修改pom文件,引入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.sam</groupId> <artifactId>hello-service</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.1.RELEASE</version> </parent> <properties> <javaVersion>1.8</javaVersion> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Camden.SR6</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <!-- 引入eureka 客户端依赖 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-eureka</artifactId> </dependency> </dependencies> </project> |
创建启动类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
/*** * * @EnableDiscoveryClient * 让服务使用eureka服务器 * 实现服务注册和发现 * */ @EnableDiscoveryClient @SpringBootApplication public class HelloApp { public static void main(String[] args) { SpringApplication.run(HelloApp.class, args); } } |
创建controller
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
@RestController public class HelloController { Logger logger = LoggerFactory.getLogger(HelloController.class); @Autowired DiscoveryClient discoveryClient; @RequestMapping("/hello") public String hello() { ServiceInstance instance = discoveryClient.getLocalServiceInstance(); //打印服务的服务id logger.info("*********" + instance.getServiceId()); return "hello,this is hello-service"; } } |
配置application.properties文件
1 2 3 4 5 |
server.port=9090 #设置服务名 spring.application.name=hello-service #设置服务注册中心的URL,本服务要向该服务注册中心注册自己 eureka.client.serviceUrl.defaultZone=http://localhost:1111/eureka |
启动并测试 )启动后再hello-service的控制台会有这种字样(xxx代表你的PC名)
1 |
Registered instance HELLO-SERVICE/xxx:hello-service:9090 with status UP (replication=false) |
eureka的控制台会打印出如下字样(xxx代表你的PC名)
1 |
Registered instance HELLO-SERVICE/xxx:hello-service:9090 with status UP (replication=false) |
)再次访问localhost:1111,会发现有服务注册到注册中心了 三、服务发现和消费 完整目录结构如下: 搭建过程: 创建maven工程(具体实现略) 修改pom文件,引入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.sam</groupId> <artifactId>hello-consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.1.RELEASE</version> </parent> <properties> <javaVersion>1.8</javaVersion> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Camden.SR6</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <!-- 引入eureka 客户端依赖 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-eureka</artifactId> </dependency> <!-- 引入ribbon 依赖 ,用来实现负载均衡,我们这里只是使用,先不作其他介绍--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-ribbon</artifactId> </dependency> </dependencies> </project> 这里比hello-service服务提供者,多了ribbon的依赖 |
创建启动类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
@EnableDiscoveryClient @SpringBootApplication public class ConsumerApp { //@Bean 应用在方法上,用来将方法返回值设为为bean @Bean @LoadBalanced //@LoadBalanced实现负载均衡 public RestTemplate restTemplate() { return new RestTemplate(); } public static void main(String[] args) { SpringApplication.run(ConsumerApp.class, args); } } |
1 |
这里也要用到@EnableDiscoveryClient, 让服务使用eureka服务器, 实现服务注册和发现 |
创建controller
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
@RestController public class ConsumerController { //这里注入的restTemplate就是在com.sam.ConsumerApp中通过@Bean配置的实例 @Autowired RestTemplate restTemplate; @RequestMapping("/hello-consumer") public String helloConsumer() { //调用hello-service服务,注意这里用的是服务名,而不是具体的ip+port restTemplate.getForObject("http://hello-service/hello", String.class); return "hello consumer finish !!!"; } } |
配置application.properties文件
1 2 3 4 5 6 7 |
server.port=9999 spring.application.name=hello-consumer eureka.client.serviceUrl.defaultZone=http://localhost:1111/eureka #这里的配置项目和服务提供者hello-service一样 |
启动,测试 )启动eureka。为了展示负责均衡的效果,我们的hello-service启动两个服务,启动两个服务的具体步骤如下 以上是hello-service1的启动步骤,端口号为9090;同样方法设置hello-service2,端口号为9091(具体实现略)。 )启动hello-consumer )再次访问http://localhost:1111/,会发现有2个hello-service服务(端口号一个是9090,一个是9091),1个hello-consume服务 ) 多次访问http://localhost:9999/hello-consumer,会发现hello-service1和hello-service2会轮流被调用(已经实现了负责均衡),可以通过两者的控制台打印内容确认(还记得我们在hello-service的controller中有个loggerlogger.info("*********" + instance.getServiceId());吗?对,就是这个打印) 四、总结 以上实例实现了基本的服务治理: 通过spring-cloud-starter-eureka-server和@EnableEurekaServer实现服务注册中心 通过spring-cloud-starter-eureka和@EnableDiscoveryClient使用并注册到服务注册中心 通过spring-cloud-starter-eureka和@EnableDiscoveryClient使用注册中心并发现服务,通过spring-cloud-starter-ribbon来实现负载均衡消费服务 PS:这里说明下,我用的IDE是Spring Tool Suite,是spring定制版的eclipse,方便我们使用spring进行开发,有兴趣的朋友可以自行百度了解下。 from:https://www.cnblogs.com/sam-uncle/p/8954401.html
View DetailsEureka是Netflix开发的服务发现框架,本身是一个基于REST的服务,主要用于定位运行在AWS域中的中间层服务,以达到负载均衡和中间层服务故障转移的目的。SpringCloud将它集成在其子项目spring-cloud-netflix中,以实现SpringCloud的服务发现功能。 Eureka包含两个组件:Eureka Server和Eureka Client。 Eureka Server提供服务注册服务,各个节点启动后,会在Eureka Server中进行注册,这样EurekaServer中的服务注册表中将会存储所有可用服务节点的信息,服务节点的信息可以在界面中直观的看到。 Eureka Client是一个java客户端,用于简化与Eureka Server的交互,客户端同时也就是一个内置的、使用轮询(round-robin)负载算法的负载均衡器。 在应用启动后,将会向Eureka Server发送心跳,默认周期为30秒,如果Eureka Server在多个心跳周期内没有接收到某个节点的心跳,Eureka Server将会从服务注册表中把这个服务节点移除(默认90秒)。 Eureka Server之间通过复制的方式完成数据的同步,Eureka还提供了客户端缓存机制,即使所有的Eureka Server都挂掉,客户端依然可以利用缓存中的信息消费其他服务的API。综上,Eureka通过心跳检查、客户端缓存等机制,确保了系统的高可用性、灵活性和可伸缩性。 from:https://baike.baidu.com/item/Eureka/22402835?fr=aladdin
View Details出现的错误一: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'sqlSessionFactory' defined in class path resource [applicationContext.xml]: Invocation of init method failed; nested exception is org.springframework.core.NestedIOException: Failed to parse config resource: class path resource [mybatis-config.xml]; nested exception is org.apache.ibatis.builder.BuilderException: Error parsing SQL Mapper Configuration. Cause: java.lang.ClassCastException: com.github.pagehelper.PageHelper cannot be cast to org.apache.ibatis.plugin.Interceptor at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1628) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:483) at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:742) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543) at org.springframework.web.context.ContextLoader.configureAndRefreshWebApplicationContext(ContextLoader.java:443) at org.springframework.web.context.ContextLoader.initWebApplicationContext(ContextLoader.java:325) at org.springframework.web.context.ContextLoaderListener.contextInitialized(ContextLoaderListener.java:107) at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:5017) at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5531) at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150) at org.apache.catalina.core.ContainerBase.addChildInternal(ContainerBase.java:901) at org.apache.catalina.core.ContainerBase.addChild(ContainerBase.java:877) at org.apache.catalina.core.StandardHost.addChild(StandardHost.java:652) at org.apache.catalina.startup.HostConfig.deployDirectory(HostConfig.java:1263) at org.apache.catalina.startup.HostConfig$DeployDirectory.run(HostConfig.java:1948) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.springframework.core.NestedIOException: Failed to parse config resource: class path resource [mybatis-config.xml]; nested exception is org.apache.ibatis.builder.BuilderException: Error parsing SQL Mapper Configuration. Cause: java.lang.ClassCastException: com.github.pagehelper.PageHelper cannot be cast to org.apache.ibatis.plugin.Interceptor at org.mybatis.spring.SqlSessionFactoryBean.buildSqlSessionFactory(SqlSessionFactoryBean.java:500) at org.mybatis.spring.SqlSessionFactoryBean.afterPropertiesSet(SqlSessionFactoryBean.java:380) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1687) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1624) … 25 more Caused by: org.apache.ibatis.builder.BuilderException: Error parsing SQL Mapper Configuration. Cause: java.lang.ClassCastException: com.github.pagehelper.PageHelper cannot be cast to org.apache.ibatis.plugin.Interceptor at org.apache.ibatis.builder.xml.XMLConfigBuilder.parseConfiguration(XMLConfigBuilder.java:121) at org.apache.ibatis.builder.xml.XMLConfigBuilder.parse(XMLConfigBuilder.java:99) at org.mybatis.spring.SqlSessionFactoryBean.buildSqlSessionFactory(SqlSessionFactoryBean.java:494) … 28 more Caused by: java.lang.ClassCastException: com.github.pagehelper.PageHelper cannot be cast to org.apache.ibatis.plugin.Interceptor at org.apache.ibatis.builder.xml.XMLConfigBuilder.pluginElement(XMLConfigBuilder.java:183) at org.apache.ibatis.builder.xml.XMLConfigBuilder.parseConfiguration(XMLConfigBuilder.java:110) … 30 more 十月 09, 2017 10:25:39 上午 org.apache.catalina.core.StandardContext listenerStart 严重: Exception sending context initialized event to listener instance of class org.springframework.web.context.ContextLoaderListener org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'sqlSessionFactory' defined in class path resource [applicationContext.xml]: Invocation of init method failed; nested exception is org.springframework.core.NestedIOException: Failed to parse config resource: class path resource [mybatis-config.xml]; nested exception is org.apache.ibatis.builder.BuilderException: Error parsing SQL Mapper Configuration. Cause: java.lang.ClassCastException: com.github.pagehelper.PageHelper cannot be cast to org.apache.ibatis.plugin.Interceptor at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1628) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:483) at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:742) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543) at org.springframework.web.context.ContextLoader.configureAndRefreshWebApplicationContext(ContextLoader.java:443) at org.springframework.web.context.ContextLoader.initWebApplicationContext(ContextLoader.java:325) at org.springframework.web.context.ContextLoaderListener.contextInitialized(ContextLoaderListener.java:107) at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:5017) at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5531) at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150) at org.apache.catalina.core.ContainerBase.addChildInternal(ContainerBase.java:901) at org.apache.catalina.core.ContainerBase.addChild(ContainerBase.java:877) at org.apache.catalina.core.StandardHost.addChild(StandardHost.java:652) at org.apache.catalina.startup.HostConfig.deployDirectory(HostConfig.java:1263) at org.apache.catalina.startup.HostConfig$DeployDirectory.run(HostConfig.java:1948) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.springframework.core.NestedIOException: Failed to parse config resource: class path resource [mybatis-config.xml]; nested exception is org.apache.ibatis.builder.BuilderException: Error parsing SQL Mapper Configuration. Cause: java.lang.ClassCastException: com.github.pagehelper.PageHelper cannot be cast to org.apache.ibatis.plugin.Interceptor at org.mybatis.spring.SqlSessionFactoryBean.buildSqlSessionFactory(SqlSessionFactoryBean.java:500) at org.mybatis.spring.SqlSessionFactoryBean.afterPropertiesSet(SqlSessionFactoryBean.java:380) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1687) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1624) … 25 more Caused by: org.apache.ibatis.builder.BuilderException: Error parsing SQL Mapper Configuration. Cause: java.lang.ClassCastException: com.github.pagehelper.PageHelper cannot be cast to org.apache.ibatis.plugin.Interceptor at org.apache.ibatis.builder.xml.XMLConfigBuilder.parseConfiguration(XMLConfigBuilder.java:121) […]
View Details引入pom.xml
1 2 3 4 5 6 7 |
<!-- https://docs.spring.io/spring-amqp/docs/2.0.2.RELEASE/reference/html/ https://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-messaging.html#boot-features-amqp --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> |
application.yml 配置部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
#rabbitmq rabbitmq: host: 10.0.0.2 port: 5672 username: springboot password: password publisher-confirms: true publisher-returns: true template: mandatory: true #https://github.com/spring-projects/spring-boot/blob/v2.0.5.RELEASE/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java listener: concurrency: 2 #最小消息监听线程数 max-concurrency: 2 #最大消息监听线程数 mybatis: mapper-locations: classpath:mapping/*.xml ··· |
创建配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
package com.redis; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; //错误的 import com.rabbitmq.client.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; /** * * Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输, * Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 Queue:消息的载体,每个消息都会被投到一个或多个队列。 * * Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来. * * Routing Key:路由关键字, * * exchange根据这个关键字进行消息投递。 vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。 * * Producer:消息生产者,就是投递消息的程序. Consumer:消息消费者,就是接受消息的程序. * Channel:消息通道,在客户端的每个连接里,可建立多个channel. * * */ @Configuration public class RabbitConfig { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; public static final String EXCHANGE_A = "my-mq-exchange_A"; public static final String QUEUE_A = "QUEUE_A"; public static final String ROUTINGKEY_A = "spring-boot-routingKey_A"; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) // 必须是prototype类型 public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } /** * * 针对消费者配置 * * 1. 设置交换机类型 * * 2. 将队列绑定到交换机 * * FanoutExchange: * 将消息分发到所有的绑定队列,无routingkey的概念 * * HeadersExchange :通过添加属性key-value匹配 * DirectExchange:按照routingkey分发到指定队列 * TopicExchange:多关键字匹配 * * 如果需要使用的其他的交换器类型,spring中都已提供实现,所有的交换器均实现org.springframework.amqp.core.AbstractExchange接口。 常用交换器类型如下: Direct(DirectExchange):direct 类型的行为是"先匹配, 再投送". 即在绑定时设定一个 routing_key, 消息的routing_key完全匹配时, 才会被交换器投送到绑定的队列中去。 Topic(TopicExchange):按规则转发消息(最灵活)。 Headers(HeadersExchange):设置header attribute参数类型的交换机。 Fanout(FanoutExchange):转发消息到所有绑定队列。 * */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE_A); } /** * 获取队列A * * @return */ @Bean public Queue queueA() { return new Queue(QUEUE_A, true); // 队列持久 } /** * 一个交换机可以绑定多个消息队列,也就是消息通过一个交换机,可以分发到不同的队列当中去。 * * @return */ @Bean public Binding binding() { return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A); } } |
Producer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
package com.redis; import java.util.UUID; import org.springframework.amqp.core.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; @Component public class MsgProducer implements RabbitTemplate.ConfirmCallback , ReturnCallback{ private final Logger logger = LoggerFactory.getLogger(this.getClass()); // 由于rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入 private RabbitTemplate rabbitTemplate; /** * 构造方法注入rabbitTemplate */ @Autowired public MsgProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setConfirmCallback(this); // rabbitTemplate如果为单例的话,那回调就是最后设置的内容 /** * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。 * ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。 */ rabbitTemplate.setReturnCallback(this); } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println(message.getMessageProperties().getCorrelationIdString() + " 发送失败"); System.out.println("消息主体 message : "+message); System.out.println("消息主体 message : "+replyCode); System.out.println("描述:"+replyText); System.out.println("消息使用的交换器 exchange : "+exchange); System.out.println("消息使用的路由键 routing : "+routingKey); } /** * rabbitTemplate.send(message); //发消息,参数类型为org.springframework.amqp.core.Message rabbitTemplate.convertAndSend(object); //转换并发送消息。 将参数对象转换为org.springframework.amqp.core.Message后发送 rabbitTemplate.convertSendAndReceive(message) //转换并发送消息,且等待消息者返回响应消息。 * @param content */ public void sendMsg(String content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); System.out.println("开始发送消息c : " + content.toLowerCase() + " ,correlationId= " + correlationId); String response = rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId).toString(); System.out.println("结束发送消息c : " + content.toLowerCase()); System.out.println("消费者响应c : " + response + " 消息处理完成"); //logger.info(" 发送消息TO A:" + content); // 把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A //rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId); } /** * 回调 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info(" 回调id:" + correlationData); if (ack) { logger.info("消息成功消费"); } else { logger.info("消息消费失败:" + cause); } } } |
Receiver
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
package com.redis; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = RabbitConfig.QUEUE_A) public class MsgReceiver { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @RabbitHandler public void process(String content) { System.out.println("接收处理队列A当中的消息: " + content); } } |
unit test
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
import java.util.Date; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.redis.MsgProducer; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitMqTest { @Autowired private MsgProducer sender; @Test public void sendTest() throws Exception { //while(true){ String msg = new Date().toString(); sender.sendMsg(msg); Thread.sleep(6000); //} } } |
可以测试通过 image.png 高并发的访问量, 除了使用nginx/haproxy本身实现外, 尝试了google guava 简单好用, 来控制前端用户请求量, 范例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
package com.test.ratelimit; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.util.concurrent.RateLimiter; public class ComplexDemo { private static RateLimiter rateLimiter = RateLimiter.create(10); private static AtomicInteger suc = new AtomicInteger(0), fail = new AtomicInteger(0); public static void main(String[] args) { // TODO Auto-generated method stub List<Runnable> tasks = new ArrayList<Runnable>(); for (int i = 0; i < 100; i++) { tasks.add(new UserRequest(i)); } ExecutorService threadPool = Executors.newCachedThreadPool(); for (Runnable runnable : tasks) { threadPool.execute(runnable); } } private static boolean startGo(int i) { //基于令牌桶算法的限流实现类 /** * 一秒出10个令牌,0.1秒出一个,100个请求进来,假如100个是同时到达, 那么最终只能成交10个,90个都会因为超时而失败。 * */ /** * tryAcquire(long timeout, TimeUnit unit) * 从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话, * 或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待) */ //判断能否在1秒内得到令牌,如果不能则立即返回false,不会阻塞程序 if (!rateLimiter.tryAcquire(1000, TimeUnit.MILLISECONDS)) { System.out.println("暂时无法获取令牌, 排队失败" + i); fail.getAndIncrement(); System.out.println("SUC/FAIL=" + suc.get() + "/" + fail.get()); return false; } if (update() > 0) { System.out.println("成功" + i); suc.getAndIncrement(); System.out.println("FAIL/SUC=" + fail.get() + "/" + suc.get()); return true; } System.out.println("数据不足,失败"); return false; } private static int update() { return 1; } private static class UserRequest implements Runnable { private int id; public UserRequest(int id) { this.id = id; } public void run() { startGo(id) ; } } } 测试结果: ... 成功8 FAIL/SUC=89/10 成功7 FAIL/SUC=89/11 |
总结, rabbit mq, 这里只用Direct。 Topic匹配灵活, 可以用到其他场景。 REF: http://www.rabbitmq.com/install-rpm.html 作者:DONG999 链接:https://www.jianshu.com/p/f621b47c80c3 来源:简书 简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
View Details1. 安装RabbitMQ linux下的安装没什么可说的,因为本机懒得重装虚拟机了,所以就下载了windows版本进行安装。 erlang下载地址:http://www.erlang.org/download.html rabbitMQ下载:http://www.rabbitmq.com/download.html 直接下载安装即可。 因为想用可视化界面监控消息,所以先激活这个功能。
1 2 |
//到rabbitMQ安装目录的sbin目录下启动cmd黑窗口 E:\software\RabbitMQServer\rabbitmq_server-3.6.5\sbin>rabbitmq-plugins.bat enable rabbitmq_management |
然后重启rabbitMQ服务。输入网址:http://localhost:15672/。 使用默认用户guest/guest进入网页端控制台。 2. rabbitMQ基本原理和使用 rabbitMQ原理 Broker:简单来说就是消息队列服务器实体。 Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 Queue:消息队列载体,每个消息都会被投入到一个或多个队列。 Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。 Routing Key:路由关键字,exchange根据这个关键字进行消息投递。 vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。 producer:消息生产者,就是投递消息的程序。 consumer:消息消费者,就是接受消息的程序。 channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。 消息队列的使用过程大概如下 客户端连接到消息队列服务器,打开一个channel。 客户端声明一个exchange,并设置相关属性。 客户端声明一个queue,并设置相关属性。 客户端使用routing key,在exchange和queue之间建立好绑定关系。 客户端投递消息到exchange。 总结:exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。 Direct交换机 完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。 所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。 Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。 Topic交换机 对key进行模式匹配后进行投递的叫做Topic交换机。*(星号)可以代替一个任意标识符 ;#(井号)可以代替零个或多个标识符。 在上图例子中,我们发送描述动物的消息。消息会转发给包含3个单词(2个小数点)的路由键绑定的队列中。绑定键中的第一个单词描述的是速度,第二个是颜色,第三个是物种:“速度.颜色.物种”。 我们创建3个绑定:Q1绑定键是“.orange.”,Q2绑定键是“..rabbit”,Q3绑定键是“lazy.#”。这些绑定可以概括为:Q1只对橙色的动物感兴趣。Q2则是关注兔子和所有懒的动物。 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上, 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上, Exchange 将RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.” 只会匹配到“log.error”。 Fanout交换机 还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。 所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。 Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。 所以,Fanout Exchange 转发消息是最快的。 Headers交换机 首部交换机是忽略routing_key的一种路由方式。路由器和交换机路由的规则是通过Headers信息来交换的,这个有点像HTTP的Headers。将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个Hash的数据结构,消息发送的时候,会携带一组hash数据结构的信息,当Hash的内容匹配上的时候,消息就会被写入队列。 绑定交换机和队列的时候,Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或者all,这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串(string)。 持久化 RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分: – exchange持久化,在声明时指定durable => 1 – queue持久化,在声明时指定durable => 1 – 消息持久化,在投递时指定delivery_mode => 2(1是非持久化) 如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。 3. rabbitMQ-Direct交换机 […]
View DetailsAMQP AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现。它主要包括以下组件: 1.Server(broker): 接受客户端连接,实现AMQP消息队列和路由功能的进程。 2.Virtual Host:其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host 3.Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不同类型的Exchange路由的行为是不一样的。 4.Message Queue:消息队列,用于存储还未被消费者消费的消息。 5.Message: 由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。 6.Binding:Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。 7.Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。 8.Channel:信道,仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。 9.Command:AMQP的命令,客户端通过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端可以通过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。 在了解了AMQP模型以后,需要简单介绍一下AMQP的协议栈,AMQP协议本身包括三层: 1.Module Layer,位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑,例如,客户端可以通过queue.declare声明一个队列,利用consume命令获取一个队列中的消息。 2.Session Layer,主要负责将客户端的命令发送给服务器,在将服务器端的应答返回给客户端,主要为客户端与服务器之间通信提供可靠性、同步机制和错误处理。 3.Transport Layer,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示。 RabbitMQ使用场景 学习RabbitMQ的使用场景,来自官方教程:https://www.rabbitmq.com/getstarted.html 场景1:单发送单接收 使用场景:简单的发送与接收,没有特别的处理。 Producer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } } |
Consumer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } } |
场景2:单发送多接收 使用场景:一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。 Producer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } } |
发送端和场景1不同点: 1、使用“task_queue”声明了另一个Queue,因为RabbitMQ不容许声明2个相同名称、配置不同的Queue 2、使"task_queue"的Queue的durable的属性为true,即使消息队列durable 3、使用MessageProperties.PERSISTENT_TEXT_PLAIN使消息durable When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren’t lost: we need to mark both the queue […]
View Details