Appearance
后端开发常见问题
- JSON_Web_Token
- 灰度发布
- 消息队列
- 存储过程
- 数据库读写分离
- SQL 分页
- SQL 索引原理
- 数据库查询优化
- 锁
- 线程池
- node爬虫
- 后端获取客户IP
- 获取系统并发用户数
- 高并发处理中缓存、降级、限流技术
- 数据库性能优化指导
- 阿里云消息队列RocketMQ
- 外部排序
- 图状数据库GraphQL
- OAuth
- 10万条数据批量插入
JSON_Web_Token
参考链接:
详解:
web api 的用户用户角色验证中涉及用户身份识别的问题。
以往后端保存数据到 session 后,把 sessionID 通过 cookie 传给前端,前端每次请求都通过 cookie 传回,后端在 session 找到前期保存的数据,校验相等,即可识别用户。这种方法的缺点是跨域服务 session 难共享(A 和 B 网站关联服务,要求 A 登录后,到 B 处能自动登录),如果采用数据库持久化,则较为麻烦。因此采用 jwt 的办法。
jwt 的数据结构:“base64UrlEncode(Header).base64UrlEncode(Payload).Signature”
Header
javascript{ "alg": "HS256",//签名的算法,,默认是 HMAC SHA256(写成 HS256) "typ": "JWT"//令牌(token)的类型(type),JWT 令牌统一写为JWT }
Payload
javascript{ "iss": "(issuer):签发人" "exp": "(expiration time):过期时间" "sub": "(subject):主题" "aud": "(audience):受众" "nbf": "(Not Before):生效时间" "iat": "(Issued At):签发时间" "jti": "(JWT ID):编号" //以上7个官方字段可供选用,下方可加自定义字段 //因为jwt不加密,所以不能传递敏感信息 }
Signature:需要先指定秘钥,写到网站配置
textHMACSHA256(base64UrlEncode(header) + "." +base64UrlEncode(payload),secret) base64UrlEncode是基于base64之上,把=忽略,把+替换为-,把/替换为_
使用方式:把 jwt 放在请求头 Header 中的 Authorization 字段中
注意:jwt 包含认证信息,一旦被盗用,则可拥有所有权限,所以 jwt 有效期要设置得比较短,且用 https 传输
灰度发布
参考链接:
详解
概念
灰度发布指某次新发布功能特性和旧功能特性之间能够以平滑过渡的方式呈现给用户
实现原理
后端判断用户是否在灰度测试名单内(查数据库,有点像游戏报名公测),决定渲染前端哪个版本的页面或数据
样例
- nginx
- 5000 端口:前端页面位于 D:\ABtesting,version1 为 index1.html,version2 为 index2.html,访问http://localhost:5000/v1和http://localhost:5000/v2能访问到2个版本的页面(类似vue部署build文件)
- 4000 端口:给用户访问的 url(http://localhost:4000),代理转发到后端 nodejs 接口(http://localhost:3000)
nginxserver { listen 5000; server_name localhost; root D:\ABtesting; location /v1 { try_files $uri $uri/ /index1.html; } location /v2 { try_files $uri $uri/ /index2.html; } } server { listen 4000; server_name localhost; location / { proxy_pass http://localhost:3000; } }
- nodejs
- 访问http://localhost:3000/checkMVVM,然后访问:域名/版本(mvvm模式)
- 访问http://localhost:3000/checkMVC,然后访问:域名/版本(mvc模式)
- 访问http://localhost:3000/v1,返回版本1页面(mvc模式)
- 访问http://localhost:3000/v2,返回版本2页面(mvc模式)
- 访问http://localhost:3000/其它,返回404
javascriptvar url = require("url"), fs = require("fs"), http=require("http"); http.createServer(function (req, res) { var pathName = url.parse(req.url).pathname.replace(/\//, ''); console.log(pathName); if(pathName.indexOf('v1')>-1){ res.setHeader("Content-Type","text/html;charset='utf-8'"); fs.readFile("./index1.html","utf-8",function(err,data){ if(err) { console.log("index1.html loading is failed :"+err); } else{ res.end(data); } }); } else if(pathName.indexOf('v2')>-1){ res.setHeader("Content-Type","text/html;charset='utf-8'"); fs.readFile("./index2.html","utf-8",function(err,data){ if(err) { console.log("index2.html loading is failed :"+err); } else{ res.end(data); } }); } else if(pathName.indexOf('checkMVVM')>-1){ //假装查询到应该返回哪个版本的页面 if(new Date().getTime() % 2 == 0){ res.writeHead(302,{ 'Location': 'http://localhost:5000/v1' }); res.end(); } else{ res.writeHead(302,{ 'Location': 'http://localhost:5000/v2' }); res.end(); } } else if(pathName.indexOf('checkMVC')>-1){ //假装查询到应该返回哪个版本的页面 if(new Date().getTime() % 2 == 0){ res.writeHead(302,{ 'Location': 'http://localhost:3000/v1' }); res.end(); } else{ res.writeHead(302,{ 'Location': 'http://localhost:3000/v2' }); res.end(); } } else{ res.write('404'); res.end(); } }).listen(3000);
- 页面*:index*.html *表示 1-4
html<!DOCTYPE html> <html> <head> <title>v*</title> </head> <body> <input type="text" value="v*" /> </body> </html>
流程:
运行 windows 下的 nginx.exe
运行 nodejs:node index.js
访问浏览器:
样例文件见文件夹【灰度发布】
消息队列
参考链接:
详解:
在不使用消息队列服务器的时候,用户的请求数据直接写入数据库,在高并发的情况下数据库压力剧增,使得响应速度变慢。但是在使用消息队列之后,用户的请求数据发送给消息队列之后立即 返回,再由消息队列的消费者进程从消息队列中获取数据,异步写入数据库。由于消息队列服务器处理速度快于数据库(消息队列也比数据库有更好的伸缩性),因此响应速度得到大幅改善。
AMQP,即 Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
存储过程
参考链接:
详解:
概念
存储过程就是一条或多条 SQL 语句的集合,可视为批文件,但是起作用不仅限于批处理。
优点
- 可封装,可传参
- 减少应用与数据库服务器之间通讯流量及时间
- 相对批量有更高的执行效率
缺点
- 不同数据库切换,需要重写
使用
- 声明与调用
sqlDROP PROCEDURE IF EXISTS `过程名`; CREATE PROCEDURE 过程名([[IN|OUT|INOUT] 参数名 数据类型[,[IN|OUT|INOUT] 参数名 数据类型…]]) 过程体--创建存储过程 --in只能当做传入参数 --out只能当做转出参数 --inout可当做传入转出参数 DELIMITER ;;--声明语句结束符 CREATE PROCEDURE fun(IN s int,OUT p_out int,INOUT p_out2 int) BEGIN SELECT p_out; SET p_out=2; SELECT p_out; SELECT p_out2; SET p_out2=2; SELECT p_out2; END DELIMITER ; SET @p=1; CALL fun(@p) ;--调用存储过程 SET @p_out2=1; CALL funa(@p_out2); SELECT @p_out2;
- 程序逻辑
sqlDROP PROCEDURE IF EXISTS proc; DELIMITER ;; CREATE PROCEDURE proc(IN parameter int) BEGIN--嵌套块使用begin和end DECLARE var int;--声明变量 SET var=parameter+1;--变量赋值 IF var=0 THEN SELECT 0; END IF ; IF parameter=0 THEN select "=0"; ELSE select ">0"; END IF ; CASE param WHEN 0 THEN SELECT 0; WHEN 1 THEN SELECT 1; ELSE SELECT 1; END CASE ; SET var=0; WHILE var<6 DO SELECT var; SET var=var+1; END WHILE ; DECLARE v INT; SET v=0; REPEAT SELECT v; SET v=v+1; UNTIL v>=5 END REPEAT; SET v=0; LOOP_LABLE:LOOP--语句块贴标签 SELECT v; SET v=v+1; IF v >=5 THEN LEAVE LOOP_LABLE; END IF; END LOOP; END ; ;; DELIMITER ; call proc(-1);
- 样例 1
sqlDROP TABLE IF EXISTS `tbl_job`; CREATE TABLE `tbl_job` ( `ID` int(11) NOT NULL AUTO_INCREMENT, `JOB_NAME` varchar(32) NOT NULL COMMENT '职位名称', `OCCUPANT_ID` int(11) NOT NULL COMMENT '任职者', `AGE` int(11) NOT NULL COMMENT '年龄', PRIMARY KEY (`ID`) ); -- ---------------------------- -- Records of tbl_job -- ---------------------------- INSERT INTO `tbl_job` VALUES ('1', '经理', '1', '21'), ('2', '董事长', '2', '21'), ('3', '项目组长', '3', '22'), ('4', 'SE', '4', '24'), ('5', 'MDE', '5', '24'); DROP PROCEDURE IF EXISTS proc_tbl_job; DELIMITER ;; CREATE PROCEDURE proc_tbl_job(IN inId INT) BEGIN DECLARE id INT; DECLARE jobName VARCHAR(32); DECLARE occupantId INT; DECLARE age INT; DECLARE done INT DEFAULT FALSE; DECLARE curJob CURSOR FOR ( -- 定义 SELECT ID,JOB_NAME,OCCUPANT_ID,AGE FROM tbl_job -- WHERE ID = inId ); DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE; -- 监听器 OPEN curJob; -- 打开游标 label:LOOP FETCH curJob INTO id,jobName,occupantId,age; IF done THEN LEAVE label;END IF; SELECT id,jobName,occupantId,age; END LOOP label; CLOSE curJob; -- 关闭游标 END ;; delimiter; CALL proc_tbl_job(2)
- 样例 2
sqlDROP PROCEDURE IF EXISTS proc_syn_single_blacklist; DELIMITER ;; CREATE PROCEDURE proc_syn_single_blacklist(IN var_license VARCHAR(10)) BEGIN DECLARE var_calc_amount_owed INT DEFAULT 0; DECLARE var_calc_paid_in_money INT DEFAULT 0; DECLARE var_calc_arrears_count INT DEFAULT 0; SELECT License ,IFNULL(SUM(ReceivablesMoney),0),IFNULL(SUM(PaidInMoney),0),IFNULL(COUNT(*),0) into var_license, var_calc_amount_owed,var_calc_paid_in_money,var_calc_arrears_count FROM urpcs_evasion_arrears WHERE license=var_license; SELECT var_license,var_calc_amount_owed,var_calc_paid_in_money,var_calc_arrears_count; END ;; DELIMITER; DROP PROCEDURE IF EXISTS proc_syn_blacklist; DELIMITER ;; CREATE PROCEDURE proc_syn_blacklist() BEGIN DECLARE var_license VARCHAR(10); DECLARE done INT DEFAULT FALSE; DECLARE curJob CURSOR FOR ( -- 定义 SELECT DISTINCT license FROM urpcs_evasion_arrears ORDER BY EvasionID LIMIT 2 ); DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE; -- 监听器 OPEN curJob; -- 打开游标 label:LOOP FETCH curJob INTO var_license; IF done THEN LEAVE label;END IF; call proc_syn_single_blacklist(var_license); END LOOP label; CLOSE curJob; -- 关闭游标 END ;; DELIMITER; CALL proc_syn_blacklist();
数据库读写分离
参考链接:
详解:
将数据库分为主从库,一个主库用于写数据,多个从库完成读数据的操作,主从库之间通过某种机制进行数据的同步,提升数据库的读写性能
mysql存储在磁盘里,redis存储在内存里,redis既可以用来做持久存储,也可以做缓存,而目前大多数公司的存储都是mysql + redis,mysql作为主存储,redis作为辅助存储被用作缓存,加快访问读取的速度,提高性能
那么为什么不直接全部用redis存储呢?
因为redis存储在内存中,如果存储在内存中,存储容量肯定要比磁盘少很多,那么要存储大量数据,只能花更多的钱去购买内存,造成在一些不需要高性能的地方是相对比较浪费的,所以目前基本都是mysql(主) + redis(辅),在需要性能的地方使用redis,在不需要高性能的地方使用mysql,好钢用在刀刃上
同步策略
先更新mysql数据,再通过消息队列更新redis缓存
SQL 分页
参考链接:
详解:
- 三重循环
sqlselect * from (select top pageSize * from (select top (pageIndex*pageSize) * from Questions order by id asc ) as q1 order by id desc ) as q2 order by id asc
- 利用 max(id)
sqlselect top pageSize * from Questions where id >= (select max(id) from (select top ((pageIndex-1)*pageSize+1) id from Questions order by id asc) as q1) order by id;
- 利用 row_number
sqlselect top pageSize * from (select row_number() over(order by id asc) as rownumber,* from Questions) q1 where rownumber>((pageIndex-1)*pageSize);
- offset /fetch next(2012 版本及以上才有)
sqlselect * from Questions order by id offset ((pageIndex-1)*pageSize) rows fetch next pageSize rows only;
- 封装的存储过程
sqlcreate procedure paging_procedure ( @pageIndex int, @pageSize int ) as begin --实现方法 end --调用 exec paging_procedure @pageIndex=2,@pageSize=8;
SQL 索引原理
参考链接:
详解:
索引的生成
把平铺堆叠的数据变为平衡树,查询时无需逐一查找,而是变为 n 分查找,提高了查询效率,但增删改导致数据变化,因此需要梳理平衡树,导致降低了增删改效率。
聚集索引:主键
非聚集索引:index
sql
create index index_name on table_name(column_name);
数据库查询优化
参考链接:
详解:
- 在 where 及 order by 涉及的列上建立索引
- 避免在 where 子句中对字段进行 null 值判断,否则将导致引擎放弃使用索引而进行全表扫描,可设置默认值 0 并判断相等
sqlselect id from t where num is null --改为 select id from t where num=0
避免在 where 子句中使用!=或<>操作符,否则将引擎放弃使用索引而进行全表扫描
避免在 where 子句中使用 or 来连接条件,否则将导致引擎放弃使用索引而进行全表扫描
sqlselect id from t where num=10 or num=20 --改为 select id from t where num=10 union all select id from t where num=20
- in 和 not in 也要慎用,否则会导致全表扫描
sqlselect id from t where num in(1,2,3) --改为 select id from t where num between 1 and 3
- like 问题
sqlselect id from t where name like '%abc%' --改为 --若要提高效率,可以考虑全文索引
- where 子句中使用参数,也会导致全表扫描。因为 SQL 只有在运行时才会解析局部变量,但优化程序不能将访问计划的选择推迟到运行时;它必须在编译时进行选择。然而,如果在编译时建立访问计划,变量的值还是未知的,因而无法作为索引选择的输入项。
sqlselect id from t wherenum=@num --改为 select id from t with(index(索引名)) wherenum= @num
- 避免在 where 子句中对字段进行表达式操作,这将导致引擎放弃使用索引而进行全表扫描
sqlselect id from t where num/2=100 --改为 select id from t where num=100*2
- 避免在 where 子句中对字段进行函数操作,这将导致引擎放弃使用索引而进行全表扫描
sqlselect id from t where substring(name,1,3)='abc'--name以abc开头的id select id from t where datediff(day,createdate,'2005-11-30')=0--‘2005-11-30’生成的id
锁
参考链接:
详解:
业务场景
秒杀,是短时间内多个用户“争抢”资源(商品),多个线程对资源进行操作,就必须控制线程对资源的争抢,既要保证高效并发,也要保证操作的正确。
秒杀在技术层面的抽象应该就是一个方法,在这个方法里可能的操作是将商品库存-1,将商品加入用户的购物车等等,最简单直接的实现就是在这个方法上加上锁,如只锁住秒杀的代码块,比如写数据库的部分;让他“不并发”,将所有的线程用一个队列管理起来,使之变成串行操作
但这样粗暴加锁会使锁粒度偏高,如果两个线程同时执行秒杀方法,这两个线程操作的是不同的商品,是可以同时进行的。
各种锁
悲观锁
担心拿数据时被别人修改,所以查询时先加锁在修改,保证操作时别人修改不了,期间需要访问该数据的都会等待。
共享锁
又称为读锁,可以查看但无法修改和删除的一种数据锁。(读取)操作创建的锁。其他用户可以并发读取数据,但不能修改,增加,删除数据。资源共享。
排他锁
又称为写锁,其他线程对该记录的更新与删除操作都会阻塞等待。
乐观锁
每次拿数据的时候都完全不担心会被别人修改,所以不会上锁,但是在更新数据的时候去判断该期间是否被别人修改过(使用版本号等机制,靠表设计和代码来实现)
在更新之前,先查询一下库存表中当前版本号,在做update的时候,以版本号作为修改条件,提交更新的时候,判断数据库表对应记录的当前库存数与第一次取出来的库存数进行比对,如果数据库表当前库存数与第一次取出来的库存数相等,则予以更新,否则认为是过期数据。
悲观锁:用于写比较多的情况,避免了乐观锁不断重试从而降低性能
乐观锁:用于读比较多的情况,避免了不必要的加锁的开销
CAS与synchronized
CAS属于乐观锁,适用于写比较少的情况,冲突较少
synchronized属于悲观锁,适用于冲突写比较多的情况,
冲突较多竞争较少的场景:synchronized会阻塞和唤醒线程并在用户态和内核态切换浪费消耗cpu资源。
CAS基于硬件实现,不需要进入内核,不需要切换线程,操作自旋几率较少,因此可以获得更高的性能。
竞争严重的场景:CAS自旋的概率会比较大,从而浪费更多的CPU资源,效率低于synchronized。
java开发手册:如果线程访问冲突小于20%,推荐使用乐观锁,否则使用悲观锁。乐观锁的重试次数不小于3次。
分布式锁
synchronized 只是本地锁,锁的也只是当前jvm下的对象,在分布式场景下,要用分布式锁。
锁降级
锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。
线程池
参考链接:
详解:
线程池的好处
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
使用场景
线程池一般用于执行多个不相关联的耗时任务,没有多线程的情况下,任务顺序执行,使用了线程池的话可让多个不相关联的任务同时执行。
线程池配置越多越好吗?
并不是人多就能把事情做好,反而增加了沟通交流成本。线程数量过多增加了上下文切换成本。
上下文切换
多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。
当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换。
上下文切换通常是计算密集型的。它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。
Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。
线程池数量太小会怎样?
如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OutOfMemory。CPU 根本没有得到充分利用。
涉及合理的线程池
CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
线程池的核心参数
- corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。
- maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
- workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,信任就会被存放在队列中。
node爬虫
参考链接:
详解:
javascript// 请求 url - > html(信息) -> 解析html const https = require('https'); const cheerio = require('cheerio'); const fs = require('fs'); // 请求 top250 // 浏览器输入一个 url, get https.get('https://movie.douban.com/top250',function(res){ // console.log(res); // 分段返回的 自己拼接 let html = ''; // 有数据产生的时候 拼接 res.on('data',function(chunk){ html += chunk; }) // 拼接完成 res.on('end',function(){ console.log(html); const $ = cheerio.load(html); let allFilms = []; $('li .item').each(function(){ // this 循环时 指向当前这个电影 // 当前这个电影下面的title // 相当于this.querySelector const title = $('.title', this).text(); const star = $('.rating_num',this).text(); const pic = $('.pic img',this).attr('src'); // console.log(title,star,pic); // 存 数据库 // 没有数据库存成一个json文件 fs allFilms.push({ title,star,pic }) }) // 把数组写入json里面 fs.writeFile('./films.json', JSON.stringify(allFilms),function(err){ if(!err){ console.log('文件写入完毕'); } }) // 图片下载一下 downloadImage(allFilms); }) }) function downloadImage(allFilms) { for(let i=0; i<allFilms.length; i++){ const picUrl = allFilms[i].pic; // 请求 -> 拿到内容 // fs.writeFile('./xx.png','内容') https.get(picUrl,function(res){ res.setEncoding('binary'); let str = ''; res.on('data',function(chunk){ str += chunk; }) res.on('end',function(){ fs.writeFile(`./images/${i}.png`,str,'binary',function(err){ if(!err){ console.log(`第${i}张图片下载成功`); } }) }) }) } }
后端获取客户IP
参考链接:
详解:
如果有 x-forwarded-for 的请求头,则取其中的第一个 IP,否则取建立连接 socket 的 remoteAddr。
textX-Forwarded-For: 203.0.113.195, 70.41.3.18, 150.172.238.178 X-Forwarded-For: <client>, <proxy1>, <proxy2>
koa
javascriptget ips() { const proxy = this.app.proxy; const val = this.get(this.app.proxyIpHeader); let ips = proxy && val ? val.split(/\s*,\s*/) : []; if (this.app.maxIpsCount > 0) { ips = ips.slice(-this.app.maxIpsCount); } return ips; }, get ip() { if (!this[IP]) { this[IP] = this.ips[0] || this.socket.remoteAddress || ''; } return this[IP]; },
获取系统并发用户数
参考链接:
详解:
通过 redis 的 zset 可实现,当一个用户请求任何接口时,实现一个 middleware
javascript// 当一个用户访问任何接口时,对该用户Id,写入 zset await redis.zadd(`Organization:${organizationId}:concurrent`, Date.now(), `User:${userId}`) // 查询当前机构的并发数 // 通过查询一分钟内的活跃用户来确认并发数,如果超过则抛出特定异常 const activeUsers = await redis.zrangebyscore(`Organization:${organizationId}:concurrent`, Date.now() - 1000 * 60, Date.now()) // 查出并发数 const count = activeUsers.length // 删掉过期的用户 await redis.zrembyscore(`Organization:${organizationId}:concurrent`, Date.now() - 1000 * 60, Date.now())
高并发处理中缓存、降级、限流技术
参考链接:
详解:
缓存
浏览器缓存是指当我们使用浏览器访问一些网站页面或者http服务时,根据服务端返回的缓存设置响应头将响应内容缓存到浏览器,下次可以直接使用缓存内容或者仅需要去服务端验证内容是否过期即可。
可以减少浏览器和服务端之间来回传输的数据量,节省带宽提升性能。
像apache traffic server、squid、varnish、nginx等技术都可以来进行内容缓存。还有CDN就是用来加速用户访问。
降级
当访问量剧增、服务出现问题(如响应时间慢或不响应)或非核心服务影响到核心流程的性能时,仍然需要保证服务还是可用的,即使是有损服务。系统可以根据一些关键数据进行自动降级,也可以配置开关实现人工降级。
降级的最终目的是保证核心服务可用,即使是有损的。而且有些服务是无法降级的(如加入购物车、结算)。
设置预案:
一般:比如有些服务偶尔因为网络抖动或者服务正在上线而超时,可以自动降级;
警告:有些服务在一段时间内成功率有波动(如在95~100%之间),可以自动降级或人工降级,并发送告警;
错误:比如可用率低于90%,或者数据库连接池被打爆了,或者访问量突然猛增到系统能承受的最大阀值,此时可以根据情况自动降级或者人工降级;
严重错误:比如因为特殊原因数据错误了,此时需要紧急人工降级。
限流
限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务(定向到错误页或告知资源没有了)、排队或等待(比如秒杀、评论、下单)、降级(返回兜底数据或默认数据,如商品详情页库存默认有货)。v
一般开发高并发系统常见的限流有:
限制总并发数(比如数据库连接池、线程池)
限制瞬时并发数(如nginx的limit_conn模块,用来限制瞬时并发连接数)
限制时间窗口内的平均速率(如Guava的RateLimiter、nginx的limit_req模块,限制每秒的平均速率)
其他还有如限制远程接口调用速率、限制MQ的消费速率、根据网络连接数、网络流量、CPU或内存负载等来限流。
常见算法:
计数器算法
对于A接口来说,我们1分钟的访问次数不能超过100个。
开始时设置一个计数器counter,每当一个请求过来的时候,counter就加1,如果counter的值大于100并且该请求与第一个请求的间隔时间还在1分钟之内,那么说明请求数过多;如果该请求与第一个请求的间隔时间大于1分钟,且counter的值还在限流范围内,那么就重置 counter
缺点:
有一个恶意用户,他在0:59时,瞬间发送了100个请求,并且1:00又瞬间发送了100个请求,那么其实这个用户在 1秒里面,瞬间发送了200个请求。我们刚才规定的是1分钟最多100个请求,也就是每秒钟最多1.7个请求,用户通过在时间窗口的重置节点处突发请求, 可以瞬间超过我们的速率限制。用户有可能通过算法的这个漏洞,瞬间压垮我们的应用。
滑动窗口
一个时间窗口就是一分钟。然后我们将时间窗口进行划分,将滑动窗口 划成了6格,所以每格代表的是10秒钟。每过10秒钟,我们的时间窗口就会往右滑动一格。每一个格子都有自己独立的计数器counter,比如当一个请求 在0:35秒的时候到达,那么0:30~0:39对应的counter就会加1。
解决计数器算法缺点:
0:59到达的100个请求,当时间到达1:00时,我们的窗口会往右移动一格,那么此时时间窗口内的总请求数量一共是200个,超过了限定的100个,所以此时能够检测出来触发了限流。
当滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。
令牌桶算法
- 所有的请求在处理之前都需要拿到一个可用的令牌才会被处理;
- 根据限流大小,设置按照一定的速率往桶里添加令牌;
- 桶设置最大的放置令牌限制,当桶满时、新添加的令牌就被丢弃或者拒绝;
- 请求达到后首先要获取令牌桶中的令牌,拿着令牌才可以进行其他的业务逻辑,处理完业务逻辑之后,将令牌直接删除;
- 令牌桶有最低限额,当桶中的令牌达到最低限额的时候,请求处理完之后将不会删除令牌,以此保证足够的限流;
漏桶算法
往桶中以一定速率流出水,以任意速率流入水,当水超过桶流量则丢弃,因为桶容量是不变的,保证了整体的速率。
数据库性能优化指导
参考链接:
详解:
合理使用索引,结合查询情况创建组合索引
考虑在 where 及 order by 涉及的列上建立索引,结合查询的情况,提高文档的查询、更新、删除、排序操作,尽量避免全表扫描
对于包含多个字段(键)条件的查询,创建包含这些字段的组合索引
尽量避免在索引列上使用mysql的内置函数
sql--反例: select userId,loginTime from loginuser where Date_ADD(loginTime,Interval 7 DAY) >=now(); --正例: explain select userId,loginTime from loginuser where loginTime >= Date_ADD(NOW(),INTERVAL - 7 DAY);
索引不宜太多,一般5个以内
- 索引并不是越多越好,索引虽然提高了查询的效率,但是也降低了插入和更新的效率。
- insert或update时有可能会重建索引,所以建索引需要慎重考虑,视具体情况来定。
- 一个表的索引数最好不要超过5个,若太多需要考虑一些索引是否没有存在的必要。
索引不适合建在有大量重复数据的字段上,如性别这类型数据库字段
查询时要尽可能通过条件和 limit 限制数据,并select具体字段而非*
当偏移量特别大的时候,查询效率也会变得低
分页
sql--反例: select id,name,age from employee limit 10000,10 --正例: --方案一 :返回上次查询的最大记录(偏移量) select id,name from employee where id>10000 limit 10. --方案二:order by + 索引 select id,name from employee order by id limit 10000,10 --方案三:在业务允许的情况下限制页数:
尽可能限制返回的字段等数据量
减少网络流量和客户端的内存使用
查询量大时建议不要用正则查询
正则表达式查询不能使用索引,执行的时间比大多数选择器更长
使用正则查询也一定要尽可能的缩写模糊匹配的范围,比如使用开始匹配符 ^ 或结束匹配符 $
经常根据地址来筛选客户来源,那你应该在数据库对数据进行处理,比如 province 和 city 来清洗重组数据从而替代模糊查询
禁止遍历取出所有数据再来排序的愚蠢行为
尽量少在业务量大的地方用以下查询指令
判断字段是否存在的exists,要求值不在给定的数组内的nin,表示需满足任意多个查询筛选条件or,表示需不满足指定的条件not
会使索引失效,从而全表扫描
对不用的数据或过期的数据可以进行定期归档并删除
不能一次查询多个指令、正则查询套来套去
使用短字段名
增加冗余字段
查询时会存在计算、跨表等问题,这个时候建议新增一些冗余字段
虚假删除:delete:false
尽量不要把数据库请求放到循环体内
先一次性查询多条数据,在循环体内对数据进行处理之后再一次性写回数据库
sql--反例: for(User u :list){ INSERT into user(name,age) values(#name#,#age#) } --正例: --一次500批量插入,分批进行 insert into user(name,age) values <foreach collection="list" item="item" index="index" separator=","> (#{item.name},#{item.age}) </foreach>
尽量使用一个数据库请求代替多个数据库请求
预留空字段,已备后续使用
都满足SQL需求的前提下,推荐优先使用Inner join(内连接),如果要使用left join,左边表数据结果尽量小,如果有条件的尽量放到左边处理。
distinct 字段尽量少
sql--反例: SELECT DISTINCT * from user; --正例: select DISTINCT name from user;
一次性删除太多数据,可能会有lock wait timeout exceed的错误,所以建议分批操作。
sql--反例: --一次删除10万或者100万+? delete from user where id <100000; --或者采用单一循环操作,效率低,时间漫长 for(User user:list){ delete from user; } --正例: --分批进行删除,如每次500 delete user where id<500 delete product where id>=500 and id<1000;
如果检索结果中不会有重复的记录,推荐union all 替换 union
当在SQL语句中连接多个表时,请使用表的别名,并把别名前缀于每一列上,这样语义更加清晰。
尽可能使用varchar/nvarchar 代替 char/nchar
- 因为首先变长字段存储空间小,可以节省存储空间。
- 其次对于查询来说,在一个相对较小的字段内搜索,效率更高。
为了提高group by 语句的效率,可以在执行到该语句前,把不需要的记录过滤掉
sql--反例: select job,avg(salary) from employee group by job having job ='president' or job = 'managent' --正例: select job,avg(salary) from employee where job ='president' or job = 'managent' group by job;
字段类型是字符串,where时一定用引号括起来,否则索引失效
不加单引号时,是字符串跟数字的比较,它们类型不匹配,MySQL会做隐式的类型转换,把它们转换为浮点数再做比较。
sql--反例: select * from user where userid =123; --正例: select * from user where userid ='123';
使用explain 分析SQL
sqlexplain select * from user where userid =10086 or age =18;
阿里云消息队列RocketMQ
参考链接:
详解:
概念
MQ 的本质
转发器:一发一存一消费:生产者发消息->队列存消息->消费者收消息
消息:就是要传输的数据,可以是最简单的文本字符串,也可以是自定义的复杂格式(只要能按预定格式解析出来即可)。
队列:一种先进先出数据结构。它是存放消息的容器,消息从队尾入队,从队头出队,入队即发消息的过程,出队即收消息的过程。
发布-订阅模型
如果需要将一份消息数据分发给多个消费者,并且每个消费者都要求收到全量的消息。
一个可行的方案是:为每个消费者创建一个单独的队列,让生产者发送多份。这种做法比较笨,而且同一份数据会被复制多份,也很浪费空间。
存放消息的容器变成了 “主题topic”,订阅者在接收消息之前需要先 “订阅主题”。
它和 “队列模式” 的异同:生产者就是发布者,队列就是主题,消费者就是订阅者,无本质区别。唯一的不同点在于:一份消息数据是否可以被多次消费。
场景例子
电商业务中最常见的「订单支付」场景:在订单支付成功后,需要更新订单状态、更新用户积分、通知商家有新订单、更新推荐系统中的用户画像等等。
系统解耦
引入 MQ 后,订单支付现在只需要关注它最重要的流程:更新订单状态即可。其他不重要的事情全部交给 MQ 来通知。
异步通信
更新用户积分、通知商家、更新用户画像这些步骤全部变成了异步执行,能减少订单支付的整体耗时,提升订单系统的吞吐量。
消息类型
- 普通消息:普通消息适用于系统间异步解耦、削峰填谷、日志服务、大规模机器的Cache同步以及实时计算分析等场景
- 事务消息:事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致
- 分区顺序消息:消息根据Sharding Key进行分区,同一个分区的消息将严格按照先入先出的方式进行顺序发布和顺序消费,可以提高并发度和整体性能
- 全局顺序消息:所有消息将严格按照先入先出的顺序,进行顺序发布和顺序消费
- 定时/延时消息:定时消息是指将消息发送到MQ服务端,在消息发送时间(当前时间)之后的指定时间点进行投递,例如指定在2016/01/01 15:00:00进行消息投递。延时消息是指将消息发送到MQ服务端,在消息发送时间(当前时间)之后的指定延迟时间点进行投递,比如指定在消息发送时间的30分钟之后进行投递
详细设计
难点1:RPC 通信
直接利用成熟的 RPC 框架 Dubbo 或者 Thrift 实现即可,这样不需要考虑服务注册与发现、负载均衡、通信协议、序列化方式等一系列问题
也可以基于 Netty 来做底层通信,用 Zookeeper、Euraka 等来做注册中心,然后自定义一套新的通信协议(类似 Kafka),也可以基于 AMQP 这种标准化的 MQ 协议来做实现(类似 RabbitMQ)。
难点2:高可用设计
- Broker 服务的高可用,只需要保证 Broker 可水平扩展进行集群部署即可,进一步通过服务自动注册与发现、负载均衡、超时重试机制、发送和消费消息时的 ack 机制来保证。
- 存储方案的高可用:
- 参考 Kafka 的分区 + 多副本模式,但是需要考虑分布式场景下数据复制和一致性方案(类似 Zab、Raft等协议),并实现自动故障转移;
- 还可以用主流的 DB、分布式文件系统、带持久化能力的 KV 系统,它们都有自己的高可用方案。
难点3:存储设计
追加写日志文件(数据部分) + 索引文件的方式(很多主流的开源 MQ 都是这种方式),索引设计上可以考虑稠密索引或者稀疏索引,查找消息可以利用跳转表、二份查找等,还可以通过操作系统的页缓存、零拷贝等技术来提升磁盘文件的读写性能。
难点4:消费关系管理
可以基于 Zookeeper、Apollo 等配置中心来管理以及进行变更通知。
难点5:高性能设计
Reactor 网络 IO 模型、业务线程池的设计、生产端的批量发送、Broker 端的异步刷盘、消费端的批量拉取
开通服务 -> 用户授权 -> 创建实例 -> 控制台创建topic -> 创建tcp/http group id -> 获取tcp/http接入点 -> 下载tcp/http sdk -> 运行示例代码
流程注意
- 创建的同一个Group ID不能混用于TCP协议和HTTP协议
- 分别获取TCP协议和HTTP协议的SDK来使用对应协议的接入点,不能混用
- TCP协议客户端接入点仅在公网地域有公网接入点,其余地域只提供内网接入点,HTTP协议在各地域均提供公网和内网接入点
- 如果您的应用有跨地域使用消息队列RocketMQ版的场景,推荐您使用HTTP协议
创建资源
- 创建Topic:消息的一级归类,例如创建Topic_Trade这一Topic来识别交易类消息
- 创建Group ID:为消息的消费者(或生产者)创建客户端ID,即Group ID作为标识
- Group ID和Topic的关系是N:N,即一个消费者可以订阅多个Topic,同一个Topic也可以被多个消费者订阅;一个生产者可以向多个Topic发送消息,同一个Topic也可以接收来自多个生产者的消息。
- 获取接入点
注意
- 定时消息的精度会有1s~2s的延迟误差。
- 定时和延时消息的msg.setStartDeliverTime参数需要设置成当前时间戳之后的某个时刻(单位毫秒)。如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。
- 定时和延时消息的msg.setStartDeliverTime参数可设置40天内的任何时刻(单位毫秒),超过40天消息发送将失败。
- StartDeliverTime是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时和延时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。
- 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差。
- 设置定时和延时消息的投递时间后,依然受3天的消息保存时长限制。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第8天被删除。
使用说明
在线接口调用
外部排序
场景
javascript
// 有一个 10G 文件,每一行是一个时间戳,
// 现在要在一台 2C4G 的机器上对它进行排序,输出排序以后的文件
// 案例输入
// 1570593273487
// 1570593273486
// 1570593273488
// …
// 输出
// 1570593273486
// 1570593273487
// 1570593273488
// …
错误示例:10GB 的文件无法一次性放进内存里处理,内存只有 4GB
javascript
async function sort(inputFile, outputFile) {
const input = fs.createReadStream(inputFile);
const rl = readline.createInterface({ input });
const arr = [];
for await (const line of rl) {
const item = Number(line);
arr.push(item);
}
arr.sort((a, b) => a - b);
fs.writeFileSync(outputFile, arr.join('\n'));
}
最简短代码(linux 实现外部排序)
javascript
const cp = require('child_process');
function sort(inputFile, outputFile) {
cp.exec(`sort -n ${inputFile} > ${outputFile}`);
}
外部排序思路
text
1. 每次读取内存可容纳的数据出来
2. 在内存中对每块数据进行归并排序,并输出文件片段,得到各段不相关的有序数据
3. 利用归并排序思想,两两读取数据块每行数据(x(i),y(j))进行归并(向新数组插入数据),内存没满时,先存在内存,内存满了,输出一部分有序数据到新文件,然后继续
4. 那么每两个各自有序的数据会变成两个连续有序的数据,对新文件编好号
5. 把(a,b)视为整体,a文件读完就马上从b文件读起,对(a,b)和(c,d)再次进行合并,生成(e,f,g,h)4个连续有序文件,如此类推
6. 最终合并(m1,m2,m3,...)和(n1,n2,n3,...),通过不断追加内容,得到排好序的大文件
实现
index
javascript
const debug = require('debug')('externalSort');
const os = require('os');
const readline = require('readline');
const HeapNode = require('./heap_node');
const { initHeap, heapify } = require('./heapify');
const { writeToTempFile, searchInsert } = require('./utils');
const ONE_GB = 1024 * 1024 * 1024;
const defaultOptions = {
maxHeap: ONE_GB,
tempDir: os.tmpdir(),
serializer: num => num + '\n',
deserializer: line => (line ? Number(line.trim()) : null),
comparer: (a, b) => (a - b),
};
/**
* 外排序实现
*
* @param {Readable} input 输入的流
* @param {Writable} output 输出的流
* @param {Object} options
* - {Number} maxHeap 内存排序的上限,超过需要借助文件
* - {String} tempDir 临时文件目录
* - {Function} serializer 回写文件时的函数
* - {Function} deserializer 处理一行输入的函数
* - {Function} comparer 排序的比较函数
* @return {Promise} { void }
*/
async function externalSort(input, output, options) {
const { maxHeap, tempDir, serializer, deserializer, comparer } = Object.assign({}, defaultOptions, options);
const rl = readline.createInterface({
input,
});
let size = 0;
let arr = [];
let fileCount = 0;
const files = [];
for await (const line of rl) {
debug(`Line from file: ${line}`);
const len = Buffer.byteLength(line, 'utf8');
size += len;
const item = deserializer(line);
if (size >= maxHeap) {
files.push(await writeToTempFile(arr, tempDir, fileCount, serializer));
fileCount += 1;
size = len;
arr = [ item ];
} else {
searchInsert(arr, item, comparer);
}
}
if (arr.length) {
files.push(await writeToTempFile(arr, tempDir, fileCount, serializer));
}
const harr = await Promise.all(files.map(async file => {
const node = new HeapNode(file, {
deserializer,
});
await node.nextLine();
return node;
}));
initHeap(harr, comparer);
let first;
do {
first = harr[0];
if (!first || first.item == null) break;
output.write(`${serializer(first.item)}`);
await first.nextLine();
heapify(harr, 0, harr.length, comparer);
} while (first && first.item != null);
return new Promise(resolve => {
output.once('close', resolve);
output.end();
});
}
module.exports = externalSort;
heap_node
javascript
const fs = require('fs');
const readline = require('readline');
const awaitFirst = require('await-first');
const defaultOptions = {
highWaterMark: 64 * 1024,
deserializer: line => line,
};
class HeapNode {
constructor(file, options) {
this._options = Object.assign({}, defaultOptions, options);
this._input = fs.createReadStream(file, { highWaterMark: this._options.highWaterMark });
this._rl = readline.createInterface({
input: this._input,
});
this._isEnd = false;
this._cache = [];
this._item = null;
this._rl.on('line', line => {
this._cache.push(line);
if (!this._input.isPaused()) {
this._input.pause();
}
});
this._rl.once('close', () => {
this._isEnd = true;
});
}
get item() {
return this._item;
}
async nextLine() {
if (this._cache.length) {
const line = this._cache.shift();
this._item = this._options.deserializer(line);
return line;
} else if (this._isEnd) {
this._item = null;
return null;
}
this._input.resume();
await awaitFirst(this._rl, [ 'line', 'close' ]);
return this.nextLine();
}
}
module.exports = HeapNode;
heapify
javascript
function swap(harr, a, b) {
const temp = harr[a];
harr[a] = harr[b];
harr[b] = temp;
}
function heapify(harr, index, size, comparer) {
const pos = index * 2;
const left = pos + 1;
const right = pos + 2;
let first = index;
if (left < size && harr[left].item !== null && comparer(harr[left].item, harr[first].item) < 0) {
first = left;
}
if (right < size && harr[right].item !== null && comparer(harr[right].item, harr[first].item) < 0) {
first = right;
}
if (first !== index) {
swap(harr, index, first);
heapify(harr, first, size, comparer);
}
}
exports.heapify = heapify;
exports.initHeap = (harr, comparer) => {
const heapSize = harr.length;
let i = Math.floor((heapSize - 1) / 2);
while (i >= 0) heapify(harr, i--, heapSize, comparer);
};
utils
javascript
const path = require('path');
const fs = require('fs').promises;
const mkdirp = require('mz-modules/mkdirp');
function findInsertPos(arr, item, comparer) {
let left = 0;
let right = arr.length - 1;
while (left <= right) {
const mid = Math.floor(left + (right - left) / 2);
if (comparer(arr[mid], item) === 0) {
return mid;
} else if (comparer(arr[mid], item) > 0) {
right = mid - 1;
} else {
left = mid + 1;
}
}
return left;
}
function searchInsert(arr, item, comparer) {
const pos = findInsertPos(arr, item, comparer);
arr.splice(pos, 0, item);
}
exports.searchInsert = searchInsert;
async function writeToTempFile(arr, tempDir, index, serializer) {
await mkdirp(tempDir);
const tempFile = path.join(tempDir, `external-sort.${index}`);
await fs.writeFile(tempFile, arr.map(serializer).join(''));
return tempFile;
}
exports.writeToTempFile = writeToTempFile;
图状数据库GraphQL
概念
GraphQL 是一种针对 Graph(图状数据)进行查询特别有优势的查询语言
应用场景
请求如下信息:
<ul> <li>我的名字</li> <li>我的头像</li> <li>我的好友(按他们跟你的亲疏程度排序取前 6):</li> <ul> <li>好友 1 的名字、头像及链接</li> <li>好友 2 的名字、头像及链接</li> <li>……</li> </ul> <li>我的照片(按时间倒序排序取前 6):</li> <ul> <li>照片 1 及其链接</li> <li>照片 2 及其链接</li> <li>……</li> </ul> <li>我的帖子(按时间倒序排序):</li> <ul> <li>帖子 1:</li> <ul> <li>帖子 1 内容</li> <li>帖子 1 评论:</li> <ul> <li>帖子 1 评论 1:</li> <ul> <li>帖子 1 评论 1 内容</li> <li>帖子 1 评论 1 作者名字</li> <li>帖子 1 评论 1 作者头像</li> </ul> <li>帖子 1 评论 2:</li> <ul> <li>……</li> </ul> <li>……</li> </ul> </ul> <li>帖子 2:</li> <ul> <li>帖子 2 内容</li> <li>帖子 2 评论:</li> <ul> <li>……</li> </ul> </ul> <li>……</li> </ul> </ul>
如果我们用常见的 RESTful API 设计,每个 API 负责请求一种类型的对象,例如用户是一个类型,帖子是另一个类型,那就需要非常多个请求才能把这个页面所需的所有数据拿回来。
而且这些请求直接还存在依赖关系,不能平行地发多个请求,例如说在获得帖子数据之前,无法请求评论数据;在获得评论数据之后,才能开始请求评论作者数据。
一个简单粗暴的办法是专门写一个 RESTful API,请求上述树状复杂数据。
但很快新问题就会出现。现在 Facebook 想要做一个新的产品,例如说是宠物,然后要在我的页面上显示我的宠物信息,那这个 RESTful API 的实现就要跟着改。
GraphQL 能够很好地解决这个问题,但前提是数据已经以图的数据结构进行保存。
例如上面说到的用户、帖子、评论是顶点,而用户跟用户发过的帖子存在边的关系,帖子跟评论存在一对多的边,评论跟用户存在一对一的边。
这时候如果新产品引入了新的对象类型(也就是顶点类型)和新的边类型,那没有关系。在查询数据时用 GraphQL 描述一下要查询的这些边和顶点就行,不需要去改 API 实现。
更多
GraphQL内置了基于隐私设置的访问控制
例如说你发的帖子有些是所有人可见的、有些是好友可见的、有些是仅同事可见的,我在打开你的页面时 Facebook 有一个中间层保证了根据我和你的关系我只能看到我该看到的帖子。
并不是所有场景都适用于 GraphQL
围绕单项信息的增删改查可以用 RESTful API 来实现。
实际获得的数据视图是树状数据结构
每一个 GraphQL 查询或更新都有自己的根节点,然后所有的数据都是从根结点展开出去的。查询后获得的数据如果要在前端重新变回图的状态,那前端就不能简单地缓存查询得到的数据,必须通过顶点的 ID 把不同节点之间的某些边重新连接起来。
延伸
graphiql一个让我们在浏览器里用图形交互的方式探索及书写GraphQL的 IDE
graphql-voyager能看见GraphQL背后的 graph 了
OAuth
参考链接:
详解:
概念
OAuth是一个关于授权(authorization)的开放网络标准,一般用于跨网站授权登录。
应用场景
网站A需要读取保存在网站B的资源,因此需要得到B的授权,才能获取。
传统方法是在A输入B的账号密码,但这样不安全:
- A会把B的账号密码保存下来,用户信息泄露,A被黑客破解后,也能得到B的账号密码
- A有账号密码后,将得到B的所有权限
- 用户只能修改密码才能收回权限,但会导致其它授权网站失效
OAuth就是为了解决上面这些问题,让"客户端"安全可控地获取"用户"的授权,与"服务商提供商"进行互动
形象场景
外卖员授权进入小区送外卖。
如果我把自己的密码,告诉快递员,他就拥有了与我同样的权限,这样好像不太合适。万一我想取消他进入小区的权力,也很麻烦,我自己的密码也得跟着改了,还得通知其他的快递员。
有没有一种办法,让快递员能够自由进入小区,又不必知道小区居民的密码,而且他的唯一权限就是送货,其他需要密码的场合,他都没有权限?
授权机制:
门禁系统的密码输入器下面,增加一个按钮,叫做"获取授权"。快递员需要首先按这个按钮,去申请授权。
他按下按钮以后,屋主(也就是我)的手机就会跳出对话框:有人正在要求授权。系统还会显示该快递员的姓名、工号和所属的快递公司。我确认请求属实,就点击按钮,告诉门禁系统,我同意给予他进入小区的授权。
门禁系统得到我的确认以后,向快递员显示一个进入小区的令牌(access token)。令牌就是类似密码的一串数字,只在短期内(比如七天)有效。
快递员向门禁系统输入令牌,进入小区。
为什么不是远程为快递员开门,而要为他单独生成一个令牌?
这是因为快递员可能每天都会来送货,第二天他还可以复用这个令牌。另外,有的小区有多重门禁,快递员可以使用同一个令牌通过它们。
OAuth的思路
OAuth在"客户端"与"服务提供商"之间,设置了一个授权层(authorization layer)。"客户端"不能直接登录"服务提供商",只能登录授权层,以此将用户与客户端区分开来。"客户端"登录授权层所用的令牌(token),与用户的密码不同。用户可以在登录的时候,指定授权层令牌的权限范围和有效期。
"客户端"登录授权层以后,"服务提供商"根据令牌的权限范围和有效期,向"客户端"开放用户储存的资料。
OAuth 2.0流程
用户打开客户端以后,客户端要求用户给予授权。
用户同意给予客户端授权。
客户端的授权模式
授权码模式(authorization code)
功能最完整、流程最严密的授权模式。它的特点就是通过客户端的后台服务器,与"服务提供商"的认证服务器进行互动。
流程
相关参数使用JSON格式发送(Content-Type: application/json),HTTP头信息中明确指定不得缓存。
用户访问客户端,后者将前者导向认证服务器。
客户端申请认证的URI
- response_type:表示授权类型,必选项,此处的值固定为"code"
- client_id:表示客户端的ID,必选项
- redirect_uri:表示重定向URI,可选项
- scope:表示申请的权限范围,可选项
- state:表示客户端的当前状态,可以指定任意值,认证服务器会原封不动地返回这个值。
样例
GET /authorize?response_type=code&client_id=s6BhdRkqt3&state=xyz&redirect_uri=https%3A%2F%2Fclient%2Eexample%2Ecom%2Fcb HTTP/1.1 Host: server.example.com
用户选择是否给予客户端授权。
假设用户给予授权,认证服务器将用户导向客户端事先指定的"重定向URI"(redirection URI),同时附上一个授权码。
服务器回应客户端的URI
- code:表示授权码,必选项。该码的有效期应该很短,通常设为10分钟,客户端只能使用该码一次,否则会被授权服务器拒绝。该码与客户端ID和重定向URI,是一一对应关系。
- state:如果客户端的请求中包含这个参数,认证服务器的回应也必须一模一样包含这个参数。
样例
HTTP/1.1 302 Found Location: https://client.example.com/cb?code=SplxlOBeZQQYbYS6WxSbIA&state=xyz
客户端收到授权码,附上早先的"重定向URI",向认证服务器申请令牌。这一步是在客户端的后台的服务器上完成的,对用户不可见。
客户端向认证服务器申请令牌的HTTP请求
- grant_type:表示使用的授权模式,必选项,此处的值固定为"authorization_code"。
- code:表示上一步获得的授权码,必选项。
- redirect_uri:表示重定向URI,必选项,且必须与A步骤中的该参数值保持一致。
- client_id:表示客户端ID,必选项。
样例
textPOST /token HTTP/1.1 Host: server.example.com Authorization: Basic czZCaGRSa3F0MzpnWDFmQmF0M2JW Content-Type: application/x-www-form-urlencoded grant_type=authorization_code&code=SplxlOBeZQQYbYS6WxSbIA &redirect_uri=https%3A%2F%2Fclient%2Eexample%2Ecom%2Fcb
认证服务器核对了授权码和重定向URI,确认无误后,向客户端发送访问令牌(access token)和更新令牌(refresh token)。
认证服务器发送的HTTP回复
- access_token:表示访问令牌,必选项。
- token_type:表示令牌类型,该值大小写不敏感,必选项,可以是bearer类型或mac类型。
- expires_in:表示过期时间,单位为秒。如果省略该参数,必须其他方式设置过期时间。
- refresh_token:表示更新令牌,用来获取下一次的访问令牌,可选项。
- scope:表示权限范围,如果与客户端申请的范围一致,此项可省略。
样例
textHTTP/1.1 200 OK Content-Type: application/json;charset=UTF-8 Cache-Control: no-store Pragma: no-cache { "access_token":"2YotnFZFEjr1zCsicMWpAA", "token_type":"example", "expires_in":3600, "refresh_token":"tGzv3JOkF0XG5Qx2TlKWIA", "example_parameter":"example_value" }
简化模式(implicit)
不通过第三方应用程序的服务器,直接在浏览器中向认证服务器申请令牌,跳过了"授权码"这个步骤
流程
客户端将用户导向认证服务器。
客户端发出的HTTP请求
- response_type:表示授权类型,此处的值固定为"token",必选项。
- client_id:表示客户端的ID,必选项。
- redirect_uri:表示重定向的URI,可选项。
- scope:表示权限范围,可选项。
- state:表示客户端的当前状态,可以指定任意值,认证服务器会原封不动地返回这个值。
样例
GET /authorize?response_type=token&client_id=s6BhdRkqt3&state=xyz&redirect_uri=https%3A%2F%2Fclient%2Eexample%2Ecom%2Fcb HTTP/1.1 Host: server.example.com
用户决定是否给于客户端授权。
假设用户给予授权,认证服务器将用户导向客户端指定的"重定向URI",并在URI的Hash部分包含了访问令牌。
认证服务器回应客户端的URI
- access_token:表示访问令牌,必选项。
- token_type:表示令牌类型,该值大小写不敏感,必选项。
- expires_in:表示过期时间,单位为秒。如果省略该参数,必须其他方式设置过期时间。
- scope:表示权限范围,如果与客户端申请的范围一致,此项可省略。
- state:如果客户端的请求中包含这个参数,认证服务器的回应也必须一模一样包含这个参数。
样例
HTTP/1.1 302 Found Location: http://example.com/cb#access_token=2YotnFZFEjr1zCsicMWpAA&state=xyz&token_type=example&expires_in=3600
浏览器向资源服务器发出请求,其中不包括上一步收到的Hash值。
资源服务器返回一个网页,其中包含的代码可以获取Hash值中的令牌。
浏览器执行上一步获得的脚本,提取出令牌。
浏览器将令牌发给客户端。
密码模式(resource owner password credentials)
用户向客户端提供自己的用户名和密码。客户端使用这些信息,向"服务商提供商"索要授权。
用户必须把自己的密码给客户端,但是客户端不得储存密码。
这通常用在用户对客户端高度信任的情况下,比如客户端是操作系统的一部分,或者由一个著名公司出品。而认证服务器只有在其他授权模式无法执行的情况下,才能考虑使用这种模式。
流程
用户向客户端提供用户名和密码。
客户端将用户名和密码发给认证服务器,向后者请求令牌。
客户端发出的HTTP请求
- grant_type:表示授权类型,此处的值固定为"password",必选项。
- username:表示用户名,必选项。
- password:表示用户的密码,必选项。
- scope:表示权限范围,可选项。
样例
textPOST /token HTTP/1.1 Host: server.example.com Authorization: Basic czZCaGRSa3F0MzpnWDFmQmF0M2JW Content-Type: application/x-www-form-urlencoded grant_type=password&username=johndoe&password=A3ddj3w
认证服务器确认无误后,向客户端提供访问令牌。
认证服务器向客户端发送访问令牌
textHTTP/1.1 200 OK Content-Type: application/json;charset=UTF-8 Cache-Control: no-store Pragma: no-cache { "access_token":"2YotnFZFEjr1zCsicMWpAA", "token_type":"example", "expires_in":3600, "refresh_token":"tGzv3JOkF0XG5Qx2TlKWIA", "example_parameter":"example_value" }
客户端模式(client credentials)
客户端以自己的名义,而不是以用户的名义,向"服务提供商"进行认证
严格地说,客户端模式并不属于OAuth框架所要解决的问题。在这种模式中,用户直接向客户端注册,客户端以自己的名义要求"服务提供商"提供服务,其实不存在授权问题。
流程
客户端向认证服务器进行身份认证,并要求一个访问令牌。
客户端发出的HTTP请求
- granttype:表示授权类型,此处的值固定为"clientcredentials",必选项。
- scope:表示权限范围,可选项。
样例
textPOST /token HTTP/1.1 Host: server.example.com Authorization: Basic czZCaGRSa3F0MzpnWDFmQmF0M2JW Content-Type: application/x-www-form-urlencoded grant_type=client_credentials
认证服务器确认无误后,向客户端提供访问令牌。
认证服务器向客户端发送访问令牌
textHTTP/1.1 200 OK Content-Type: application/json;charset=UTF-8 Cache-Control: no-store Pragma: no-cache { "access_token":"2YotnFZFEjr1zCsicMWpAA", "token_type":"example", "expires_in":3600, "example_parameter":"example_value" }
更新令牌
如果用户访问的时候,客户端的"访问令牌"已经过期,则需要使用"更新令牌"申请一个新的访问令牌。
客户端发出更新令牌的HTTP请求
- granttype:表示使用的授权模式,此处的值固定为"refreshtoken",必选项。
- refresh_token:表示早前收到的更新令牌,必选项。
- scope:表示申请的授权范围,不可以超出上一次申请的范围,如果省略该参数,则表示与上一次一致。
样例
textPOST /token HTTP/1.1 Host: server.example.com Authorization: Basic czZCaGRSa3F0MzpnWDFmQmF0M2JW Content-Type: application/x-www-form-urlencoded grant_type=refresh_token&refresh_token=tGzv3JOkF0XG5Qx2TlKWIA
客户端使用上一步获得的授权,向认证服务器申请令牌。
认证服务器对客户端进行认证以后,确认无误,同意发放令牌。
客户端使用令牌,向资源服务器申请获取资源。
资源服务器确认令牌无误,同意向客户端开放资源。
10万条数据批量插入
参考链接:
详解:
循环逐条插入(效率最低)
反复获取 Connection 以及释放 Connection 会耗费大量时间,效率低
批处理模式逐条插入,只用这一个 SqlSession
5w 条数据插入耗时 901 毫秒
mapper
java@Mapper public interface UserMapper { Integer addUserOneByOne(User user); }
XML
xml<insert id="addUserOneByOne"> insert into user (username,address,password) values (#{username},#{address},#{password}) </insert>
service
java@Service public class UserService extends ServiceImpl<UserMapper, User> implements IUserService { private static final Logger logger = LoggerFactory.getLogger(UserService.class); @Autowired UserMapper userMapper; @Autowired SqlSessionFactory sqlSessionFactory; @Transactional(rollbackFor = Exception.class) public void addUserOneByOne(List<User> users) { SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH); UserMapper um = session.getMapper(UserMapper.class); long startTime = System.currentTimeMillis(); for (User user : users) { um.addUserOneByOne(user); } session.commit(); long endTime = System.currentTimeMillis(); logger.info("一条条插入 SQL 耗费时间 {}", (endTime - startTime)); } }
test
java/** * * 单元测试加事务的目的是为了插入之后自动回滚,避免影响下一次测试结果 * 一条一条插入 */ @Test @Transactional void addUserOneByOne() { List<User> users = new ArrayList<>(); for (int i = 0; i < 50000; i++) { User u = new User(); u.setAddress("广州:" + i); u.setUsername("张三:" + i); u.setPassword("123:" + i); users.add(u); } userService.addUserOneByOne(users); }
拼接为长sql插入
插入 5 万条数据耗时 1805 毫秒
需要修改 MySQL 配置或者对待插入数据进行分片
mapper
java@Mapper public interface UserMapper { void addByOneSQL(@Param("users") List<User> users); }
xml
xml<insert id="addByOneSQL"> insert into user (username,address,password) values <foreach collection="users" item="user" separator=","> (#{user.username},#{user.address},#{user.password}) </foreach> </insert>
service
java@Service public class UserService extends ServiceImpl<UserMapper, User> implements IUserService { private static final Logger logger = LoggerFactory.getLogger(UserService.class); @Autowired UserMapper userMapper; @Autowired SqlSessionFactory sqlSessionFactory; @Transactional(rollbackFor = Exception.class) public void addByOneSQL(List<User> users) { long startTime = System.currentTimeMillis(); userMapper.addByOneSQL(users); long endTime = System.currentTimeMillis(); logger.info("合并成一条 SQL 插入耗费时间 {}", (endTime - startTime)); } }
分页+拼接插入
MyBatis Plus -> saveBatch()
java@Transactional(rollbackFor = Exception.class) @Override public boolean saveBatch(Collection<T> entityList, int batchSize) { String sqlStatement = getSqlStatement(SqlMethod.INSERT_ONE); return executeBatch(entityList, batchSize, (sqlSession, entity) -> sqlSession.insert(sqlStatement, entity)); }