-
Notifications
You must be signed in to change notification settings - Fork 1
/
coordinator.go
1259 lines (1062 loc) · 41.2 KB
/
coordinator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package LRMF
import (
"context"
"encoding/json"
"fmt"
"reflect"
"sort"
"sync"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/pkg/errors"
)
type instanceState int
const (
StateIdle instanceState = iota
StateRevoke
StateAssign
)
func (s instanceState) String() string {
switch s {
case StateIdle:
return "idle"
case StateRevoke:
return "revoke"
case StateAssign:
return "assign"
}
panic(fmt.Sprintf("Unexpected state %d", s))
}
const (
// etcd中hb node过期时间,对于hb的容忍限度增加,降低rb的频率,增加问题时长,一般15s可接受(sarama默认),这里只能说10分钟不可接受,但不能精确到s级别
defaultSessionTimeout = 15
defaultRbTimeout int64 = 30
// 出错重试需要等一会,这里设定默认值
// kafka中有很多timeout设定,确认的是分钟级别肯定是接受不了
defaultOpWaitTimeout = 1 * time.Second
)
type Coordinator struct {
// 例如:Kafka
protocol string
// service name,用于对接naming service
biz string
// https://www.confluent.io/blog/kafka-rebalance-protocol-static-membership/#unnecessary-rebalance
// 客户端提供用于标记资源个体,coordinator发现已经assign过任务,直接返回当前记录的历史任务,在blog中描述的场景不需要触发rebalance。
// 触发rebalance的场景:
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances
// The reason for triggering rebalance when leader rejoins is because there might be assignment protocol change (for example if the consumer group is using regex subscription and new matching topics show up).
// leader rejoin的场景需要rebalance,这也会发生在rolling bounces场景下,但是static membership会控制其他节点启动不会触发rebalance。
instanceId string
// 基于etcd机制实现
etcdWrapper *etcdWrapper
// 轮次,标记当前instance所处的generation,在etcd中g的存储空间通过目录隔离,instance在g的空间内协作完成任务分配
curG *G
// 与worker之间的通道
taskHub TaskHub
// 资源分配映射策略
assignor Assignor
// leader获取任务的通道
taskProvider TaskProvider
// 管理coordinator内部goroutine:
// 1. leaderCamp leader竞选,选中后处理整体rb过程中的协调工作
// 2. hb instance保证自己存活的机制,利用clientv3提供的机制与etcd做交互
// 3. watchG instance关注rb事件,leader关注instance存活(在leaderCamp中),只有leader能够触发rb
gracefulCancelFunc context.CancelFunc
gracefulWG sync.WaitGroup
// 同一WorkerCoordinator对象不允许多次join group,框架无bug场景,会自修复
mu sync.Mutex
// 不允许JoinGroup重复进入
joined bool
// rb过程中leader需要利用leaseID防止split brain,作用在state节点
newG *G // 4 leader
// 一开始就围绕Session做节点的liveness似乎是个更好的选择,目前当前节点的存活关系到:
// 1. 任务回收
// 2. trigger rb时lock的自动expire
// 1的描述:
// 利用etcd中lease机制管理与特定instance相关的任务节点,一旦instance因为网络或者自身原因不能继续保证lease的存活,与该lease
// 相关的任务节点都会被释放。
// 任务与instance存活相关,instance存活的标记在etcd中是通过hb节点保证的,所以和hb使用同样的lease。
// 注意:
// 1. instance之于etcd不是存活状态,不代表instance所处的进程已经发起被分配的工作,lrmf作为辅助库不能干扰接入应用的行为。
// 2. kafka在该场景下,broker是知道某个instance不是存活状态,可以禁止它继续拉取消息/标记offset,但是lrmf作为第三方的库,做不到。
s *concurrency.Session
// 多租户:允许特定的任务在go worker集群上做隔离,生产场景下核心的任务或者并发量与其他任务有显著区别的可以通过tenancy进行隔离做资源独立部署。
// TODO 任务之间的流量差异,可以在任务拆分算法上做研究,让集群各机器该项指标的值在更小的范围内浮动
tenancy string
}
func (c *Coordinator) JoinGroup(ctx context.Context) error {
/*
rebalance触发时机:
1 worker增加减少(这块需要考虑rolling bounce,引入static membership机制)
2 leader和worker一样,只是leader的leave相比worker代价更大(leader需要关注instance的hb、也要关注g)
3 etcd故障不影响任务处理,但在故障期间没有rebalance机制,重启instance会导致业务故障
4 mq特定场景下,partition增加(这个需要具体集成的业务场景自己保证,任务的变更需要提供接口给leader)
rmf只关注资源(一般是进程资源)的变动
*/
// 防止业务层在同一coordinator上调用多次
c.mu.Lock()
defer c.mu.Unlock()
if c.joined {
return errors.New("FAILED to call join group, already running")
}
c.joined = true
// leader/follower角色可能在运行过程中变化,单独有cancel,其他goroutine都是随着coordinator生命周期的,可以对外提供Close方法
cancelCtx, cancelFunc := context.WithCancel(ctx)
c.gracefulCancelFunc = cancelFunc
c.gracefulWG.Add(2)
go withRecover(cancelCtx, c.leaderCamp)
go withRecover(cancelCtx, c.watchG)
return nil
}
func (c *Coordinator) Close(ctx context.Context) {
if c.gracefulCancelFunc != nil {
c.gracefulCancelFunc()
c.gracefulWG.Wait()
}
Logger.Printf("coordinator %s exit on g %s", c.instanceId, c.curG.String())
}
func (c *Coordinator) TriggerRb(ctx context.Context) error {
if err := c.tryTriggerRb(ctx); err != nil {
return errors.Wrap(err, "FAILED to TriggerRb")
}
return nil
}
type G struct {
// generation的Id,和instanceId区分开
// 选择int类型,对于generation的操作必须是顺序的,不能回退
Id int64 `json:"id"`
// 限定参与此次rb的instance
Participant []string `json:"participant"`
// 开始时间,用于计算rb timeout
Timestamp int64 `json:"timestamp"`
}
func (g *G) String() string {
b, _ := json.Marshal(g)
return string(b)
}
func (g *G) LeaseID() clientv3.LeaseID {
return clientv3.LeaseID(g.Id)
}
func ParseG(ctx context.Context, val string) *G {
g := G{}
if err := json.Unmarshal([]byte(val), &g); err != nil {
// 有些错误不可能发生,发生就可以直接panic,没必要让程序继续执行
panic(fmt.Sprintf("Unexpect err: FAILED to unmarshal %s, err %s", val, err.Error()))
}
return &g
}
func (c *Coordinator) leaderCamp(ctx context.Context) {
// 参考文档:https://github.com/entertainment-venue/lrmf/wiki/etcd-clientv3%E7%AB%9E%E4%BA%89leader%E6%9C%BA%E5%88%B6
defer c.gracefulWG.Done()
for {
tryCampaign:
select {
case <-ctx.Done():
Logger.Printf("leaderCamp exit, when tryCampaign")
return
default:
}
// 有无限keepalive的保证,所以一般情况下,leader不会更改,重启的时候可能会导致leader重新选举
session, err := concurrency.NewSession(c.etcdWrapper.etcdClientV3, concurrency.WithTTL(defaultSessionTimeout))
if err != nil {
Logger.Printf("err %+v", err)
time.Sleep(defaultOpWaitTimeout)
goto tryCampaign
}
c.s = session
election := concurrency.NewElection(c.s, c.etcdWrapper.nodeRbLeader())
if err := election.Campaign(ctx, c.instanceId); err != nil {
Logger.Printf("err %+v", err)
time.Sleep(defaultOpWaitTimeout)
goto tryCampaign
}
Logger.Printf("Successfully campaign for current instance %s", c.instanceId)
// 防止leaderHandleRb到watchHb之间加入/删除的instance被漏掉,这里先取到hb节点的revision
fetchStartRevision:
select {
case <-ctx.Done():
Logger.Printf("leaderCamp exit, when fetchStartRevision")
return
default:
}
resp, err := c.etcdWrapper.get(ctx, c.etcdWrapper.nodeRbLeader(), []clientv3.OpOption{clientv3.WithPrefix()})
if err != nil {
Logger.Printf("err %+v", err)
time.Sleep(defaultOpWaitTimeout)
goto fetchStartRevision
}
startRevision := resp.Header.Revision
// 再次调用campaign会以新的身份block,所以一定要是s.Done()
tryTriggerRb:
select {
case <-c.s.Done():
Logger.Printf("leader session done, tryCampaign again")
goto tryCampaign
case <-ctx.Done():
if err := c.s.Close(); err != nil {
Logger.Printf("err %+v", err)
}
return
default:
}
if err := c.tryTriggerRb(ctx); err != nil {
Logger.Printf("err %+v", err)
time.Sleep(defaultOpWaitTimeout)
goto tryTriggerRb
}
// 开启rb监管goroutine
if err := c.leaderHandleRb(ctx); err != nil {
if !errors.Is(err, errClose) {
Logger.Printf("Unexpected err: %+v", err)
}
time.Sleep(defaultOpWaitTimeout)
goto tryTriggerRb
} else {
Logger.Printf("leader handle rb completed g [%s]", c.newG.String())
}
// leader需要检查rb,监听hb
if err := c.watchInstances(ctx, startRevision, c.s.Done()); err != nil {
Logger.Printf("Leader watch hb err: %+v", err)
}
// watchHb会block在s.Done(),执行到这里可以触发一次清理
if err := election.Resign(ctx); err != nil {
Logger.Printf("FAILED to resign, err: %+v", err)
}
}
}
func (c *Coordinator) watchG(ctx context.Context) {
defer c.gracefulWG.Done()
Logger.Print("Start watch g")
// 如果拿到的g在assign完成后,已经不是最新的g,下面的Watch会参与到后续rb中;
// 拿到g后,会获取当前state,如果是rb,就认为当前g不合法,尝试参与到g之后的rb event中。
tryStatic:
// 使用goto的场景都需要做done的判断,防止因为etcd挂掉导致goroutine导致不能接受主动退出指令
select {
case <-ctx.Done():
Logger.Printf("watchG exit when tryStatic")
return
default:
}
rev, err := c.staticMembership(ctx)
if err != nil {
Logger.Printf("FAILED to static, err: %+v", err)
time.Sleep(defaultOpWaitTimeout)
goto tryStatic
}
var opts []clientv3.OpOption
opts = append(opts, clientv3.WithFilterDelete())
opts = append(opts, clientv3.WithRev(rev))
var (
wg sync.WaitGroup
cancelFunc context.CancelFunc
)
_, cancelFunc = context.WithCancel(context.TODO())
tryWatch:
wch := c.etcdWrapper.etcdClientV3.Watch(ctx, c.etcdWrapper.nodeGId(), opts...)
for {
select {
case <-ctx.Done():
Logger.Printf("watchG exit")
// watchG退出,尝试停掉自己的子goroutine
cancelFunc()
wg.Wait()
return
case wr := <-wch:
if err := wr.Err(); err != nil {
Logger.Printf("FAILED to watchG, err: %+v", err)
// bugfix: g是很久之前创建的,不经常rb,新启动的节点从last create开始,理论上不需要多久就能找到rev
if err.Error() == "etcdserver: mvcc: required revision has been compacted" {
rev++
}
goto tryWatch
}
var (
rb bool
gRaw []byte
)
for _, ev := range wr.Events {
rev = ev.Kv.ModRevision + 1
rb = true
gRaw = ev.Kv.Value
Logger.Printf("Got watchG ev [%s]", ev.Kv.String())
}
if rb {
Logger.Printf("Use g %s", string(gRaw))
newG := G{}
if err := json.Unmarshal(gRaw, &newG); err != nil {
// 只有triggerRb用到的g id node存储G类型的json string,其他直接忽律
Logger.Printf("FAILED to unmarshal value %s, may be not rb event, err %s", gRaw, err)
continue
}
// 新rb过来,强制停止,并开启新的rb操作
cancelFunc()
wg.Wait()
// g的替换要等带rb goroutine回收完毕
c.curG = &newG
var cancelCtx context.Context
cancelCtx, cancelFunc = context.WithCancel(context.TODO())
wg.Add(1)
go c.handleRbEvent(cancelCtx, &wg)
}
}
}
}
func (c *Coordinator) staticMembership(ctx context.Context) (int64, error) {
/*
未知错误,交给上游重试,如果放弃:当前集群是idle状态,长时间某些p是不被消费的,会触发lag报警
*/
gNode := c.etcdWrapper.nodeGId()
var (
rev int64 = -1
opts []clientv3.OpOption
)
opts = append(opts, clientv3.WithLastRev()...)
resp, err := c.etcdWrapper.get(ctx, gNode, opts)
if err != nil {
return -1, errors.Wrapf(err, "FAILED to get %s, err %+v", gNode, err)
}
rev = resp.Header.Revision
if resp.Count == 0 {
Logger.Printf("g empty, return ASAP")
return rev, nil
}
// 如果最新的g包含当前instance的id,尝试static membership;
// 如果获取g后,leader立即触发rb生成新g,通过task节点防止任务并行出现,等待下次rb。
g := G{}
gValue := string(resp.Kvs[0].Value)
if err := json.Unmarshal([]byte(gValue), &g); err != nil {
return -1, errors.Errorf("FAILED to unmarshal, err: %+v value: %s", err, gValue)
}
var exist bool
for _, p := range g.Participant {
if p == c.instanceId {
exist = true
break
}
}
if !exist {
// 不包含当前instance,handleRbEvent从下一个g开始
Logger.Printf("instance %s not exist in participant %+v", c.instanceId, g.Participant)
rev++
return rev, nil
}
/*
rb中,放弃当前static membership,交给handleRbEvent处理/不处理最后一个g,可能出现以下情况:
1 这里获取的g和state不是一组(state的leaseID不是g的Id),从下一个event开始监听,不参与当前rb
2 与1不同,从当前rev开始,需要处理当前g对应的最后g
*/
rawState, err := c.etcdWrapper.getState(ctx)
if err != nil {
return -1, errors.Wrapf(err, "FAILED to get state, err %+v", err)
}
state, leaseID := stateValue(rawState).StateAndLeaseID()
if leaseID != g.Id {
if leaseID > g.Id {
// 在获取g之后有新g产生,交给handleRbEvent处理
Logger.Printf("Maybe new g with state %s, latest g %s", rawState, g.String())
// 有新g,应该从下一个g开始处理,虽然下一个g可能也不是最新的g,目标是尽快达到balance
rev++
return rev, nil
}
// leaderID是过去的,从当前g开始尝试参与rb;triggerRb中state先修改,g后新增,理论上不可能
// TODO 人工介入
Logger.Printf("Error leaseID in state %s, latest g %s", rawState, g.String())
return rev, nil
} else {
if state != StateIdle.String() {
Logger.Printf("Latest g %s rebalancing, try to handle", g.String())
// 最后的g可能正在rb,尝试加入
return rev, nil
}
// 当前可以信任的g,但也可能leader立即触发一次rb,我们记录的rev,可以识别到后续的rb,并会要求当前instance加入进去
}
assignNode := c.etcdWrapper.nodeGAssignInstanceId(g.Id, c.instanceId)
gResp, gErr := c.etcdWrapper.get(ctx, assignNode, nil)
if gErr != nil {
return -1, errors.Wrapf(err, "FAILED to get %s, err %+v", assignNode, err)
}
if gResp.Count == 0 {
// 先取g,再取state为idle,证明g是稳定版,但发现没有assign节点,可能g已经被新rb,清理掉,从下一个rb尝试加入即可
rev++
return rev, nil
}
assignment := string(gResp.Kvs[0].Value)
if err := c.assign(ctx, assignment); err != nil {
// 这里可能是因为正在rb,导致部分assign失败,会导致当前rb处理失败,进入下次rb
return -1, errors.Wrapf(err, "FAILED to assign %s, err: %+v", assignment, err)
}
c.curG = &g
// 从下一个节点开始监听
rev++
return rev, nil
}
func (c *Coordinator) handleRbEvent(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
/*
follower判断是否Participant是否包含自己
1 包含,直接rb
2 不包含,考虑下面两种情况:
如果是新instance(忽略,等leader通过hb识别到有可用instance触发rb即可);
运行任务中,hb发出delay,watchg及时,停掉现有任务,依赖task节点防止任务并行运行。
*/
var exist bool
for _, instanceId := range c.curG.Participant {
if instanceId == c.instanceId {
exist = true
break
}
}
if !exist {
Logger.Printf("FAILED to find instanceId %s in g's Participant %+v", c.instanceId, c.curG.Participant)
// 需要保证revoke成功,否则任务的冲突会被检测到
c.waitAdjustAssignment(ctx, "", c.revoke)
return
}
/*
revoke -> assign 两个阶段在全局必须串行
*/
if err := c.waitState(ctx, StateRevoke.String()); err != nil {
if !errors.Is(err, errClose) {
Logger.Printf("Unexpected err: %+v", err)
}
return
}
Logger.Printf("Instance %s waitState success %s", c.instanceId, StateRevoke)
if err := c.instanceHandleRb(ctx, StateRevoke.String()); err != nil {
if !errors.Is(err, errClose) {
Logger.Printf("Unexpected err: %+v", err)
}
return
}
Logger.Printf("Instance %s instanceHandleRb completed %s", c.instanceId, StateRevoke)
if err := c.waitState(ctx, StateAssign.String()); err != nil {
if !errors.Is(err, errClose) {
Logger.Printf("Unexpected err: %+v", err)
}
return
}
Logger.Printf("Instance %s waitState success %s", c.instanceId, StateAssign)
if err := c.instanceHandleRb(ctx, StateAssign.String()); err != nil {
if !errors.Is(err, errClose) {
Logger.Printf("Unexpected err: %+v", err)
}
return
}
Logger.Printf("Instance %s instanceHandleRb completed %s", c.instanceId, StateAssign)
if err := c.waitCompareAndSwap(
ctx,
c.curG,
c.etcdWrapper.nodeGJoinInstance(c.curG.Id),
StateAssign.String(),
StateIdle.String()); err != nil {
if !errors.Is(err, errClose) {
Logger.Printf("Unexpected err: %+v", err)
}
return
}
Logger.Printf("rb completed on %s, g [%s]", c.instanceId, c.curG.String())
}
func (c *Coordinator) leaderHandleRb(ctx context.Context) error {
/*
leader干活的开始点有两个:
1 triggerRb中因为Hb某个节点的增加/减少,instance node有变化且当前不处于rb状态
2 leaderWatcher发现leader下线触发recognizeRole,leader重新上线触发rb
*/
participant := c.newG.Participant
tasks, err := c.taskProvider.Tasks(ctx)
if err != nil {
return errors.Wrap(err, "FAILED to get tasks")
}
instanceIdAndTasks, err := c.assignor.PerformAssignment(ctx, tasks, participant)
if err != nil {
// 分配任务如果失败,直接return等待rb timeout
return errors.Wrap(err, "FAILED to perform assignment")
}
// 新任务结果写入assign node
for instanceId, tasks := range instanceIdAndTasks {
b, err := json.Marshal(tasks)
if err != nil {
return errors.Wrapf(err, "FAILED to marshal tasks %+v", tasks)
}
newAssignment := string(b)
assignNode := c.etcdWrapper.nodeGAssignInstanceId(c.newG.Id, instanceId)
tryPut:
if err := c.etcdWrapper.put(ctx, assignNode, newAssignment); err != nil {
err := errors.Wrapf(err, "FAILED to put %s to %s", newAssignment, assignNode)
Logger.Printf("err: %+v", err)
select {
case <-ctx.Done():
Logger.Printf("leaderHandleRb tryPut exit")
return err
default:
}
time.Sleep(defaultOpWaitTimeout)
goto tryPut
}
Logger.Printf("Successfully add assignment %s to node %s", newAssignment, assignNode)
}
state := StateIdle
for {
select {
case <-ctx.Done():
Logger.Printf("Instance %s leaderHandleRb exit", c.instanceId)
return errors.Wrap(errClose, "")
case <-time.After(defaultOpWaitTimeout):
switch state {
case StateIdle:
// rb的state从idle变为revoke(表示g/join/gid/开始接收revoke),等待instance都变为revoke
if err := c.waitCompareAndSwap(
ctx,
c.newG,
c.etcdWrapper.nodeRbState(),
formatStateValue(StateIdle.String(), c.newG.LeaseID()),
formatStateValue(StateRevoke.String(), c.newG.LeaseID())); err != nil {
return err
}
if err := c.waitInstanceState(ctx, StateRevoke.String(), participant); err != nil {
return err
}
state = StateRevoke
case StateRevoke:
// 所有instance都加入revoke后,让集群的rb state过渡到assign状态
if err := c.waitCompareAndSwap(
ctx,
c.newG,
c.etcdWrapper.nodeRbState(),
formatStateValue(StateRevoke.String(), c.newG.LeaseID()),
formatStateValue(StateAssign.String(), c.newG.LeaseID())); err != nil {
return errors.Wrap(err, "Quit waitCompareAndSwap, from revoke to assign")
}
// 等待instance都变为idle
if err := c.waitInstanceState(ctx, StateIdle.String(), participant); err != nil {
return errors.Wrap(err, "Quit waitInstanceState when waiting idle")
}
// rb state恢复idle状态
if err := c.waitCompareAndSwap(
ctx,
c.newG,
c.etcdWrapper.nodeRbState(),
formatStateValue(StateAssign.String(), c.newG.LeaseID()),
formatStateValue(StateIdle.String(), c.newG.LeaseID())); err != nil {
return errors.Wrap(err, "Quit waitCompareAndSwap from assign to idle")
}
return nil
}
}
}
}
func (c *Coordinator) instanceHandleRb(ctx context.Context, curState string) error {
assignNode := c.etcdWrapper.nodeGAssignInstanceId(c.curG.Id, c.instanceId)
resp, err := c.etcdWrapper.get(ctx, assignNode, nil)
if err != nil {
return errors.Wrapf(err, "FAILED to get %s", assignNode)
}
var (
gotAssignment bool
assignment string
)
if resp.Count == 0 {
// 没有给当前instance分配任务,可以直接成功,不耽误leader rb
Logger.Printf("%s got no assignment", assignNode)
} else {
gotAssignment = true
assignment = string(resp.Kvs[0].Value)
}
joinNode := c.etcdWrapper.nodeGJoinInstance(c.curG.Id)
firstTry := true
joinOp:
select {
case <-ctx.Done():
Logger.Printf("Instance %s instanceHandleRb exit when join revoke", c.instanceId)
return errors.Wrap(errClose, "")
default:
if !firstTry {
time.Sleep(defaultOpWaitTimeout)
}
}
firstTry = false
switch curState {
case StateRevoke.String():
// revoke阶段,join group在新g中相应instance下创建revoke node,leader收集第一阶段的join group请求
// 不应该存在revoke node,存在当前follower join group失败,重新尝试加入,当前follower是leader认为需要收集到的instance,加入不进去,就要等rb timeout了
if gotAssignment {
// 进入revoke后,开始根据assignment revoke自己的任务
c.waitAdjustAssignment(ctx, assignment, c.revoke)
}
// 本地revoke结束,标记当前instance revoke完结
curValue, err := c.etcdWrapper.createAndGet(ctx, joinNode, StateRevoke.String(), clientv3.NoLease)
if err != nil {
if err != errNodeExist {
Logger.Printf("FAILED to createAndGet %s, unexpected err %+v", joinNode, err)
goto joinOp
}
if curValue != StateRevoke.String() {
// 这种场景放弃,等leader任务当前g的rb失败(长时间收集不到所有join group),发起下一轮次,不应该除非出现bug 或者 etcd交互异常
// 当前instanceId节点应该只有一个goroutine操作,状态不对证明有其他goroutine操作,bug导致,需要人工介入修复
return errors.Errorf("FAILED to createAndGet %s, unexpected err %+v, quit join revoke", joinNode, err)
}
// 等值场景,直接继续执行,任务分配保证幂等,重复分配没有问题即可
}
Logger.Printf("Successfully join revoke, instance %s", c.instanceId)
return nil
case StateAssign.String():
if gotAssignment {
c.waitAdjustAssignment(ctx, assignment, c.assign)
}
// instance的状态从revoke变为assign
if err := c.etcdWrapper.compareAndSwap(ctx, joinNode, StateRevoke.String(), StateAssign.String(), -1); err != nil {
if err == errNodeValueErr {
return errors.Errorf("FAILED to compareAndSwap %s from revoke to assign, unexpected err %s, quit join assign", joinNode, err)
}
if err != errNodeValueAlreadyExist {
Logger.Printf("FAILED to compareAndSwap %s from revoke to assign , unexpected err %+v", joinNode, err)
goto joinOp
}
// 已经是assign状态,继续执行下面的可重入代码即可
}
Logger.Printf("Successfully join assign, instance %s", c.instanceId)
return nil
default:
return errors.Errorf("Unknown state %s", curState)
}
}
// 等待value变为某个值
func (c *Coordinator) waitState(ctx context.Context, target string) error {
tryGetState:
resp, err := c.etcdWrapper.get(ctx, c.etcdWrapper.nodeRbState(), nil)
if err != nil {
Logger.Printf("FAILED to get state, err %+v", err)
select {
case <-ctx.Done():
Logger.Printf("waitState exit when %s", target)
return errors.Errorf("unexpected exit when wait %s", target)
default:
}
goto tryGetState
}
if resp.Count > 0 {
v := stateValue(resp.Kvs[0].Value)
state, leaseID := v.StateAndLeaseID()
// state和leaseID要完全一致,防止instance在旧的rb中
if leaseID == c.curG.Id {
if state == target {
Logger.Printf("State change to %s, return ASAP", target)
return nil
}
} else {
// rb执行到这里等待revoke,leader发起新rb时如果没有当前instance,state会走到revoke状态,这时当前instance应该尽快识别相应rb event,在handleRbEvent中revoke掉所有instance
return errors.Errorf("FAILED to wait %s, because state's leaseID %d is difference from cur g's id %d", target, leaseID, c.curG.Id)
}
}
node := c.etcdWrapper.nodeRbState()
tryWatch:
wch := c.etcdWrapper.etcdClientV3.Watch(ctx, node, clientv3.WithRev(resp.Header.Revision), clientv3.WithFilterDelete())
for {
select {
case <-ctx.Done():
Logger.Printf("Instance %s goroutine exit when wait state %s", c.instanceId, target)
return errors.Wrap(errClose, "")
case wr := <-wch:
if err := wr.Err(); err != nil {
Logger.Printf("FAILED to waitState, err: %+v", err)
goto tryWatch
}
for _, ev := range wr.Events {
v := stateValue(ev.Kv.Value)
state, _ := v.StateAndLeaseID()
if state == target {
Logger.Printf("Got waitState ev [%s]", ev.Kv.String())
return nil
}
}
}
}
}
func (c *Coordinator) waitInstanceState(ctx context.Context, state string, participant []string) error {
// 收集所有instance的join revoke请求,进入revoke状态
// 收集current以及现存instance的assignment,新assign存入revoke
firstTry := true
wait:
select {
case <-ctx.Done():
Logger.Printf("Instance %s waitInstanceState exit when wait %s", c.instanceId, state)
return errors.Wrap(errClose, "")
default:
if !firstTry {
time.Sleep(defaultOpWaitTimeout)
}
}
firstTry = false
// return ASAP,上面判断是否需要trigger rb
if time.Now().Unix()-c.newG.Timestamp >= defaultRbTimeout {
return errors.Errorf("Rb timeout, start new g, cur g %+v", *c.newG)
}
instanceIdAndJoin, err := c.etcdWrapper.getRbInstanceIdAndJoin(ctx)
if err != nil {
Logger.Printf("FAILED to g join node, err %+v", err)
goto wait
}
if len(instanceIdAndJoin) == 0 {
Logger.Printf("Waiting instance join current g %+v on state %s", *c.newG, state)
goto wait
}
var (
instanceIds []string
foundUncompletedJoin bool
)
for instanceId, join := range instanceIdAndJoin {
if join != state {
// 有instance没有发送join revoke就continue,继续等待
// worker一般直接创建revoke节点,所以数量对基本就ok,保险防止出现state异常的情况
Logger.Printf("Waiting instance %s send join %s request, current state %s", c.instanceId, state, join)
foundUncompletedJoin = true
}
instanceIds = append(instanceIds, instanceId)
}
if foundUncompletedJoin {
goto wait
}
sort.Strings(instanceIds)
sort.Strings(participant)
if !reflect.DeepEqual(instanceIds, participant) {
Logger.Printf(
"Value not equal, new g(%d) joined instanceId %+v, participant %+v",
c.newG.Id,
instanceIds,
participant,
)
goto wait
}
return nil
}
func (c *Coordinator) waitCompareAndSwap(ctx context.Context, g *G, node string, curValue string, newValue string) error {
firstTry := true
tryCompareAndSwap:
select {
case <-ctx.Done():
Logger.Printf(
"Instance %s waitCompareAndSwap exit, when change %s from %s to %s",
c.instanceId,
node,
curValue,
newValue)
return errors.Wrap(errClose, "")
default:
if !firstTry {
time.Sleep(defaultOpWaitTimeout)
}
}
firstTry = false
// return ASAP,上面判断是否需要trigger rb
if time.Now().Unix()-g.Timestamp >= defaultRbTimeout {
return errors.Errorf("Rb timeout, g %+v", *g)
}
if err := c.etcdWrapper.compareAndSwap(ctx, node, curValue, newValue, -1); err != nil {
if err == errNodeValueErr {
return errors.Wrapf(err, "FAILED to compareAndSwap %s from %s to %s", node, curValue, newValue)
}
if err != errNodeValueAlreadyExist {
// unexpected err
Logger.Printf("FAILED to compareAndSwap %s from %s to %s, err: %+v", node, curValue, newValue, err)
time.Sleep(defaultOpWaitTimeout)
goto tryCompareAndSwap
}
// value是revoke,继续执行
}
return nil
}
func (c *Coordinator) waitAdjustAssignment(ctx context.Context, assignment string, fn func(ctx context.Context, assignment string) error) {
firstTry := true
tryAdjust:
if !firstTry {
firstTry = false
time.Sleep(defaultOpWaitTimeout)
}
select {
case <-ctx.Done():
Logger.Printf("waitAdjustAssignment exit, %s", assignment)
return
default:
}
if err := fn(ctx, assignment); err != nil {
Logger.Printf("FAILED to adjust assignment, err %+v", err)
goto tryAdjust
}
}
func (c *Coordinator) assign(ctx context.Context, assignment string) error {
// revoke失败,等待直到rb超时,goroutine退出,也可以直接返回,等待下次rb
// 增加goto,可以引入retry功能
assignedTasks, err := c.taskHub.OnAssigned(ctx, assignment)
if err != nil {
return errors.Wrapf(err, "FAILED to assign %s, because err %s", assignment, err)
}
if len(assignedTasks) == 0 {
Logger.Print("No assigned tasks")
return nil
}
Logger.Printf("Instance %s successfully assign tasks, finish OnAssigned", c.instanceId)
for _, task := range assignedTasks {
node := c.etcdWrapper.nodeTaskId(task.Key(ctx))
// 上面的OnAssigned成功,表示任务已经分配下去,这里保证etcd中新增task节点。
occupyTask:
// 这里用leaseID,保证instance down掉的场景,自己的任务etcd节点也会被清除掉,减少下面occupyTask冲突的概率。
taskOwnerInstance, err := c.etcdWrapper.createAndGet(ctx, node, c.instanceId, c.s.Lease())
if err == nil {
Logger.Printf("Successfully bind %s to instance %s", node, c.instanceId)
continue
}
if err != errNodeExist {
Logger.Printf("FAILED to createAndGet node %s, err %+v", node, err)
goto occupyTask
}
if taskOwnerInstance != c.instanceId {
if err := c.canIgnoreInstance(ctx, taskOwnerInstance); err != nil {
Logger.Printf("Unexpected err, node occupied by %s, %+v", taskOwnerInstance, err)
// 仍旧强行删除只是打印err,用于追查问题
}
if err := c.etcdWrapper.del(ctx, node); err != nil {
return errors.Wrap(err, "")
}
goto occupyTask
}
}
Logger.Printf("Instance %s successfully assign tasks, finish etcd ops", c.instanceId)
return nil
}
func (c *Coordinator) canIgnoreInstance(ctx context.Context, instanceId string) error {
hbInstanceIds, err := c.etcdWrapper.getInstanceIds(ctx)
if err != nil {
return errors.Wrap(err, "")
}
// 没有hb
var exist bool
for _, id := range hbInstanceIds {
if id == instanceId {
exist = true
break
}
}
if !exist {
Logger.Printf("Instance %s can be ignored because no hb", instanceId)
return nil
}