-
Notifications
You must be signed in to change notification settings - Fork 15
/
Module.bsl
766 lines (548 loc) · 45.2 KB
/
Module.bsl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
#Область ПрограммныйИнтерфейс
// Новое описание соединения. создает описание соединения, из данных которого в дальнейшем будет инициализировано
// соединение, в котором так же хранится кеш соединения и лог выполненных операций.
//
// Параметры:
// БазовыйАдрес - Строка - Адрес подключения к rest proxy, например "http://localhost:8082"
// ФорматОбмена - Строка - Формат обмена, в котором должен происходить обмен с rest proxy,
// пока единственный поддерживемый это "json".
//
// Возвращаемое значение:
// Структура - Описание соединения:
// * Адрес - Строка - Строка базового адреса подключения к rest proxy
// * Формат - Строка - Формат обмена.
// * Заголовки - Соответствие - Заголовки необходимые для соединения с rest proxy, зависят от формата и операции.
// * КешСоединения - Неопределено, HTTPСоединение - кеш соединения, если подключение уже выполнялось.
// * РезультатСоединения - Структура -:
// ** ИсторияОпераций - Массив - История выполененных операций в рамках данного описания соединения.
// ** ИнформацияОбОперации - Строка - Текстовое представление информации о последней выполненной операции.
// ** ОшибкаВыполнения - Булево - Флаг наличия ошибки, после выполнения операции.
Функция НовоеОписаниеСоединения(БазовыйАдрес, ФорматОбмена = "json") Экспорт
ВозможныеФорматы = Новый Массив;
ВозможныеФорматы.Добавить("json");
ВозможныеФорматы.Добавить("binary");
МассивОпераций = Новый Массив();
РезультатВыполнения = Новый Структура;
РезультатВыполнения.Вставить("ИсторияОпераций", МассивОпераций);
РезультатВыполнения.Вставить("ИнформацияОбОперации", Неопределено);
РезультатВыполнения.Вставить("ОшибкаВыполнения", Ложь);
СвойстваСоединения = Новый Структура;
СвойстваСоединения.Вставить("Адрес", БазовыйАдрес);
СвойстваСоединения.Вставить("Формат", ФорматОбмена);
СвойстваСоединения.Вставить("Заголовки", Неопределено);
СвойстваСоединения.Вставить("КешСоединения", Неопределено);
СвойстваСоединения.Вставить("РезультатСоединения", РезультатВыполнения);
ВыполнитьКонтрольДопустимыхФорматов(СвойстваСоединения, ВозможныеФорматы);
ЗаполнитьЗаголовки(СвойстваСоединения, ФорматОбмена);
Возврат СвойстваСоединения;
КонецФункции
// Новый отправитель. Создает описание структуры отправителя, через которую должны выполняться
// дальнейшие операции взаимодействия с rest proxy.
//
// Параметры:
// ОписаниеСоединения - См. НовоеОписаниеСоединения
//
// Возвращаемое значение:
// Структура - Новый отправитель:
// * ОписаниеСоединения - См. НовоеОписаниеСоединения
// * СообщенияПоТопикам - Соответствие - Топики, по которым должны отправляться сообщения.
Функция НовыйОтправитель(ОписаниеСоединения) Экспорт
ОписаниеОтправителя = СвойстваОтправителя();
ОписаниеОтправителя.ОписаниеСоединения = ОписаниеСоединения;
ВыполнитьЛогированиеНовыйОтправитель(ОписаниеСоединения);
Возврат ОписаниеОтправителя;
КонецФункции
// Новый подписчик. Создает описание структуры подписчика, через которую должны выполняться
// дальнейшие операции взаимодействия с rest proxy.
//
// Параметры:
// ОписаниеСоединения - См. НовоеОписаниеСоединения
// ИмяГруппыПодписчиков - Строка - Имя группы подписчиков, к которой создается подписчик.
// ИмяЭкземпляраПодписчика - Строка - Имя экземпляра подписчика, можно не заполнять, тогда будет сгенерировано имя.
// ИспользоватьАвтоподтверждение - Булево - Использовать автоподтверждение, если "Истина", то после получения
// сообщений оффсет группы будет сдвигаться, если "Ложь",
// то подтверждение должно идти через ПодтвердитьПолучение().
// МаксимальныйРазмерОтветаБайт - Число - Максимальный размер ответа байт, размер порции в байтах, которую будет
// возвращать rest proxy, используется для ограничения выборки сообщений.
//
// Возвращаемое значение:
// Структура - Новый подписчик:
// * ОписаниеСоединения - См. НовоеОписаниеСоединения
// * ИмяГруппыПодписчиков - См. НовыйПодписчик
// * ИмяЭкземпляраПодписчика - См. НовыйПодписчик
// * ИспользоватьАвтоподтверждение - См. НовыйПодписчик
// * МаксимальныйРазмерОтветаБайт - См. НовыйПодписчик
Функция НовыйПодписчик( ОписаниеСоединения,
ИмяГруппыПодписчиков = Неопределено,
ИмяЭкземпляраПодписчика = Неопределено,
ИспользоватьАвтоподтверждение = Ложь,
МаксимальныйРазмерОтветаБайт = 100) Экспорт
СгенерироватьИмяПустомуЗначению(ИмяГруппыПодписчиков);
СгенерироватьИмяПустомуЗначению(ИмяЭкземпляраПодписчика);
ОписаниеПодписчика = СвойстваПодписчика();
ОписаниеПодписчика.ОписаниеСоединения = ОписаниеСоединения;
ОписаниеПодписчика.ИмяГруппыПодписчиков = ИмяГруппыПодписчиков;
ОписаниеПодписчика.ИмяЭкземпляраПодписчика = ИмяЭкземпляраПодписчика;
ОписаниеПодписчика.ИспользоватьАвтоподтверждение = ИспользоватьАвтоподтверждение;
ОписаниеПодписчика.МаксимальныйРазмерОтветаБайт = МаксимальныйРазмерОтветаБайт;
ВыполнитьЛогированиеНовыйПодписчик(ОписаниеСоединения);
Возврат ОписаниеПодписчика;
КонецФункции
// Зарегистрировать подписчика. Выполняет запрос к rest proxy на регистрацию подписчика.
//
// POST /consumers/(string:group_name)
//
// Параметры:
// Подписчик - См. НовыйПодписчик
Процедура ЗарегистрироватьПодписчика(Подписчик) Экспорт
ОписаниеСоединения = Подписчик.ОписаниеСоединения;
Заголовки = ЗаголовкиСоединения(ОписаниеСоединения);
МассивАдреса = Новый Массив;
МассивАдреса.Добавить(ОписаниеСоединения.Адрес);
МассивАдреса.Добавить("consumers");
МассивАдреса.Добавить(Подписчик.ИмяГруппыПодписчиков);
Адрес = СтрСоединить(МассивАдреса, "/");
ПараметрыПодписчика = Новый Соответствие;
ПараметрыПодписчика.Вставить("name", Подписчик.ИмяЭкземпляраПодписчика);
ПараметрыПодписчика.Вставить("format", "json");
ПараметрыПодписчика.Вставить("auto.offset.reset", "earliest");
Если Не Подписчик.ИспользоватьАвтоподтверждение Тогда
ПараметрыПодписчика.Вставить("auto.commit.enable", "false");
КонецЕсли;
ТелоЗапроса = КоннекторHTTP.ОбъектВJson(ПараметрыПодписчика);
Если Не Подписчик.ИспользоватьАвтоподтверждение Тогда
ТелоЗапроса = СтрЗаменить(ТелоЗапроса, "auto_commit_enable", "auto.commit.enable");
КонецЕсли;
ТелоЗапроса = СтрЗаменить(ТелоЗапроса, "auto_offset_reset", "auto.offset.reset");
Ответ = КоннекторHTTP.Post(Адрес, ТелоЗапроса, Заголовки);
ВыполнитьЛогированиеЗарегистрироватьПодписчика(Подписчик, "Post", ТелоЗапроса, Ответ);
КонецПроцедуры
// Получить сдвиги. Получает информацию об оффсетах подписчика.
//
// GET /consumers/(string:group_name)/instances/(string:instance)/offsets
//
// Параметры:
// Подписчик - См. НовыйПодписчик
//
// Возвращаемое значение:
// Массив - сдвиги подписчика по группе консьюмеров
Функция ПолучитьСдвиги(Подписчик) Экспорт
ОписаниеСоединения = Подписчик.ОписаниеСоединения;
Заголовки = ЗаголовкиСоединения(ОписаниеСоединения);
МассивАдреса = Новый Массив;
МассивАдреса.Добавить(ОписаниеСоединения.Адрес);
МассивАдреса.Добавить("consumers");
МассивАдреса.Добавить(Подписчик.ИмяГруппыПодписчиков);
МассивАдреса.Добавить("instances");
МассивАдреса.Добавить(Подписчик.ИмяЭкземпляраПодписчика);
МассивАдреса.Добавить("offsets");
Адрес = СтрСоединить(МассивАдреса, "/");
ОтветЗапроса = КоннекторHTTP.Get(Адрес, , Заголовки);
Сдвиги = Новый Массив;
Если ОтветЗапроса.КодСостояния = 200 Тогда
ПолученныеДанные = КоннекторHTTP.КакJson(ОтветЗапроса);
Если ПолученныеДанные.Количество() > 0 Тогда
Для каждого ЗаписьДанных Из ПолученныеДанные Цикл
Сдвиги.Добавить(ЗаписьДанных);
КонецЦикла;
КонецЕсли;
КонецЕсли;
ВыполнитьЛогированиеПолучитьСдвиги(Подписчик, "Get", "<empty>", ОтветЗапроса);
Возврат Сдвиги;
КонецФункции
// Получить сообщения. Получает сообщения в соответствии с настройками, с которыми был зарегистрирован подписчик.
//
// GET /consumers/(string:group_name)/instances/(string:instance)/records
//
// Параметры:
// Подписчик - См. НовыйПодписчик
//
// Возвращаемое значение:
// Массив - Сообщения, которые представляют из себя структуру со свойствами: partition, offset, value.
Функция ПолучитьСообщения(Подписчик) Экспорт
ОписаниеСоединения = Подписчик.ОписаниеСоединения;
Заголовки = ЗаголовкиСоединения(ОписаниеСоединения);
МассивАдреса = Новый Массив;
МассивАдреса.Добавить(ОписаниеСоединения.Адрес);
МассивАдреса.Добавить("consumers");
МассивАдреса.Добавить(Подписчик.ИмяГруппыПодписчиков);
МассивАдреса.Добавить("instances");
МассивАдреса.Добавить(Подписчик.ИмяЭкземпляраПодписчика);
МассивАдреса.Добавить("records");
Адрес = СтрСоединить(МассивАдреса, "/");
ПараметрыЗапроса = Новый Структура;
Если ЗначениеЗаполнено(Подписчик.МаксимальныйРазмерОтветаБайт) Тогда
ПараметрыЗапроса.Вставить("max_bytes", Подписчик.МаксимальныйРазмерОтветаБайт);
КонецЕсли;
ОтветЗапроса = КоннекторHTTP.Get(Адрес, ПараметрыЗапроса, Заголовки);
МиниммальныйСдвиг = Неопределено;
МаксимальныйСдвиг = Неопределено;
Сообщения = Новый Массив;
Если ОтветЗапроса.КодСостояния = 200 Тогда
ПолученныеДанные = КоннекторHTTP.КакJson(ОтветЗапроса);
Если ПолученныеДанные.Количество() > 0 Тогда
ДатаСобытия = ТекущаяДата();
Для каждого ЗаписьДанных Из ПолученныеДанные Цикл
Сообщения.Добавить(ЗаписьДанных);
Партиция = ЗаписьДанных["partition"];
Сдвиг = ЗаписьДанных["offset"];
МиниммальныйСдвиг = ?(МиниммальныйСдвиг = Неопределено, Сдвиг, МиниммальныйСдвиг);
МаксимальныйСдвиг = ?(МаксимальныйСдвиг = Неопределено, Сдвиг, МаксимальныйСдвиг);
МиниммальныйСдвиг = Мин(МиниммальныйСдвиг,Сдвиг);
МаксимальныйСдвиг = Макс(МаксимальныйСдвиг, Сдвиг);;
КонецЦикла;
КонецЕсли;
КонецЕсли;
ВыполнитьЛогированиеПолучитьСообщения(Подписчик, "Get", ОтветЗапроса, МиниммальныйСдвиг, МаксимальныйСдвиг);
Возврат Сообщения;
КонецФункции
// Подтвердить получение. Используется когда автокоммит у получателя выключен.
//
// POST /consumers/(string:group_name)/instances/(string:instance)/offsets
//
// Параметры:
// Подписчик - См. НовыйПодписчик
// СвойстваПодтверждения - Структура
// * topic - Строка - Топик
// * partition - Строка - Номер партиция
// * offset - Строка - Оффсет
Процедура ПодтвердитьПолучение(Подписчик, СвойстваПодтверждения) Экспорт
ОписаниеСоединения = Подписчик.ОписаниеСоединения;
Заголовки = ЗаголовкиСоединения(ОписаниеСоединения);
МассивАдреса = Новый Массив;
МассивАдреса.Добавить(ОписаниеСоединения.Адрес);
МассивАдреса.Добавить("consumers");
МассивАдреса.Добавить(Подписчик.ИмяГруппыПодписчиков);
МассивАдреса.Добавить("instances");
МассивАдреса.Добавить(Подписчик.ИмяЭкземпляраПодписчика);
МассивАдреса.Добавить("offsets");
Адрес = СтрСоединить(МассивАдреса, "/");
Оффсет = Новый Структура;
Оффсет.Вставить("topic", СвойстваПодтверждения.Топик);
Оффсет.Вставить("partition", СвойстваПодтверждения.Партиция);
Оффсет.Вставить("offset", СвойстваПодтверждения.Сдвиг);
МассивОффсетов = Новый Массив;
МассивОффсетов.Добавить(Оффсет);
ПараметрОффсетов = Новый Структура;
ПараметрОффсетов.Вставить("offsets", МассивОффсетов);
ПараметрОффсетов = КоннекторHTTP.ОбъектВJson(ПараметрОффсетов);
РезультатПодтверждениеОффсета = КоннекторHTTP.Post(Адрес, ПараметрОффсетов, Заголовки);
ВыполнитьЛогированиеПодтвердитьПолучение(Подписчик, "Post", ПараметрОффсетов, РезультатПодтверждениеОффсета);
КонецПроцедуры
// Перезаписать сдвиг.
//
// POST /consumers/(string:group_name)/instances/(string:instance)/positions
//
// Параметры:
// Подписчик - См. НовыйПодписчик
// СвойстваСдвига Свойства сдвига
Процедура ПерезаписатьСдвиг(Подписчик, СвойстваСдвига) Экспорт
ОписаниеСоединения = Подписчик.ОписаниеСоединения;
Заголовки = ЗаголовкиСоединения(ОписаниеСоединения);
МассивАдреса = Новый Массив;
МассивАдреса.Добавить(ОписаниеСоединения.Адрес);
МассивАдреса.Добавить("consumers");
МассивАдреса.Добавить(Подписчик.ИмяГруппыПодписчиков);
МассивАдреса.Добавить("instances");
МассивАдреса.Добавить(Подписчик.ИмяЭкземпляраПодписчика);
МассивАдреса.Добавить("positions");
Адрес = СтрСоединить(МассивАдреса, "/");
Оффсет = Новый Структура;
Оффсет.Вставить("topic", СвойстваСдвига.Топик);
Оффсет.Вставить("partition", СвойстваСдвига.Партиция);
Оффсет.Вставить("offset", СвойстваСдвига.Сдвиг);
МассивОффсетов = Новый Массив;
МассивОффсетов.Добавить(Оффсет);
ПараметрОффсетов = Новый Структура;
ПараметрОффсетов.Вставить("offsets", МассивОффсетов);
ПараметрОффсетов = КоннекторHTTP.ОбъектВJson(ПараметрОффсетов);
РезультатПодтверждениеОффсета = КоннекторHTTP.Post(Адрес, ПараметрОффсетов, Заголовки);
ВыполнитьЛогированиеПерезаписатьСдвиг(Подписчик, "Post", ПараметрОффсетов, РезультатПодтверждениеОффсета);
КонецПроцедуры
// Назначить получателю топик и раздел. Используется для ручных назначений.
//
// POST /consumers/(string:group_name)/instances/(string:instance)/assignments
//
// Параметры:
// Подписчик - См. НовыйПодписчик
// Топик - Строка - Топик
// Раздел - Строка - Номер партиции
Процедура НазначитьПолучателюТопикИРаздел(Подписчик, Топик, Раздел) Экспорт
ОписаниеСоединения = Подписчик.ОписаниеСоединения;
Заголовки = ЗаголовкиСоединения(ОписаниеСоединения);
МассивАдреса = Новый Массив;
МассивАдреса.Добавить(ОписаниеСоединения.Адрес);
МассивАдреса.Добавить("consumers");
МассивАдреса.Добавить(Подписчик.ИмяГруппыПодписчиков);
МассивАдреса.Добавить("instances");
МассивАдреса.Добавить(Подписчик.ИмяЭкземпляраПодписчика);
МассивАдреса.Добавить("assignments");
Адрес = СтрСоединить(МассивАдреса, "/");
ТопикИРаздел = Новый Структура;
ТопикИРаздел.Вставить("topic", Топик);
ТопикИРаздел.Вставить("partition", Раздел);
МассивТопиковИРазделов = Новый Массив;
МассивТопиковИРазделов.Добавить(ТопикИРаздел);
ПараметрОффсетов = Новый Структура;
ПараметрОффсетов.Вставить("partitions", МассивТопиковИРазделов);
ТелоЗапроса = КоннекторHTTP.ОбъектВJson(ПараметрОффсетов);
РезультатПодтверждениеОффсета = КоннекторHTTP.Post(Адрес, ТелоЗапроса, Заголовки);
ВыполнитьЛогированиеНазначитьПолучателюТопикИРаздел(Подписчик, "Post", ТелоЗапроса, РезультатПодтверждениеОффсета);
КонецПроцедуры
// Удалить подисчика. Выполняет удаление подписчика. Подписчика после выполнения операций необходимо удалять всегда.
//
// DELETE /consumers/(string:group_name)/instances/(string:instance)
//
// Параметры:
// Подписчик - См. НовыйПодписчик
Процедура УдалитьПодисчика(Подписчик) Экспорт
ОписаниеСоединения = Подписчик.ОписаниеСоединения;
Заголовки = ЗаголовкиСоединения(ОписаниеСоединения);
МассивАдреса = Новый Массив;
МассивАдреса.Добавить(ОписаниеСоединения.Адрес);
МассивАдреса.Добавить("consumers");
МассивАдреса.Добавить(Подписчик.ИмяГруппыПодписчиков);
МассивАдреса.Добавить("instances");
МассивАдреса.Добавить(Подписчик.ИмяЭкземпляраПодписчика);
Адрес = СтрСоединить(МассивАдреса, "/");
// Удаление подписчика
РезультатУдаления = КоннекторHTTP.Delete(Адрес, , Заголовки);
ВыполнитьЛогированиеУдалитьПодисчика(Подписчик, "Delete", РезультатУдаления);
КонецПроцедуры
// Подписаться на топик.
//
// POST /consumers/(string:group_name)/instances/(string:instance)/subscription
//
// Параметры:
// Подписчик - См. НовыйПодписчик
// Топик - Строка - Топик, на который необходимо подписаться.
Процедура Подписаться(Подписчик, Топик) Экспорт
ОписаниеСоединения = Подписчик.ОписаниеСоединения;
Заголовки = ЗаголовкиСоединения(ОписаниеСоединения);
МассивАдреса = Новый Массив;
МассивАдреса.Добавить(ОписаниеСоединения.Адрес);
МассивАдреса.Добавить("consumers");
МассивАдреса.Добавить(Подписчик.ИмяГруппыПодписчиков);
МассивАдреса.Добавить("instances");
МассивАдреса.Добавить(Подписчик.ИмяЭкземпляраПодписчика);
МассивАдреса.Добавить("subscription");
Адрес = СтрСоединить(МассивАдреса, "/");
ТемаПодписки = Новый Массив();
ТемаПодписки.Добавить(Топик);
ТопикиПодписки = Новый Структура;
ТопикиПодписки.Вставить("topics", ТемаПодписки);
ТелоЗапросаПодписки = КоннекторHTTP.ОбъектВJson(ТопикиПодписки);
РезультатПодписки = КоннекторHTTP.Post(Адрес, ТелоЗапросаПодписки, Заголовки);
ВыполнитьЛогированиеПодписаться(Подписчик, "Post", ТелоЗапросаПодписки, РезультатПодписки);
КонецПроцедуры
// Отправить сообщения. Отпавляет сообщения, при этом сообщения ранее должны быть добавлены методом ДобавитьСообщение.
//
// POST /clusters/{cluster_id}/topics/{topic_name}/records
//
// Параметры:
// Отправитель - см. НовыйОтправитель
Процедура ОтправитьСообщения(Отправитель) Экспорт
ОписаниеСоединения = Отправитель.ОписаниеСоединения;
Заголовки = ЗаголовкиСоединения(ОписаниеСоединения, Истина, Ложь);
МассивАдреса = Новый Массив;
МассивАдреса.Добавить(ОписаниеСоединения.Адрес);
МассивАдреса.Добавить("topics");
Адрес = СтрСоединить(МассивАдреса, "/");
СообщенияПоТопикам = Отправитель.СообщенияПоТопикам;
Для Каждого ТопикСообщения Из СообщенияПоТопикам Цикл
Топик = ТопикСообщения.Ключ;
Сообщения = ТопикСообщения.Значение;
АдресТопика = Адрес + "/" + Топик;
СоответствиеСообщений = Новый Структура("records", Сообщения);
СообщенияПодготовленные = КоннекторHTTP.ОбъектВJson(СоответствиеСообщений);
РезультатОтправкиСообщения = КоннекторHTTP.Post(АдресТопика, СообщенияПодготовленные, Заголовки);
ВыполнитьЛогированиеОтправитьСообщения(Отправитель, "Post", СообщенияПодготовленные, РезультатОтправкиСообщения);
КонецЦикла;
КонецПроцедуры
// Добавить сообщение. Добавляет сообщение в контекст отправителя, сообщения отправляются методом ОтправитьСообщения.
//
// Параметры:
// Отправитель - см. НовыйОтправитель
// Сообщение - Строка - Текст отправляемого сообщения
// Топик - Строка - Топик, в которые будет отправлено сообщение
// Ключ - Неопределено, Строка - Ключ сообщения.
// Раздел - Неопределено, Строка - Раздел, партиция, в которую необходимо отправить сообщение.
Процедура ДобавитьСообщение(Отправитель, Сообщение, Топик, Ключ = Неопределено, Раздел = Неопределено) Экспорт
НовоеСообщение = "";
Если Отправитель.ОписаниеСоединения.Формат = "json" Тогда
ПараметрыСообщения = Новый Структура;
Если ЗначениеЗаполнено(Раздел) Тогда
ПараметрыСообщения.Вставить("partition", Раздел);
КонецЕсли;
Если ЗначениеЗаполнено(Ключ) Тогда
ПараметрыСообщения.Вставить("key", Ключ);
КонецЕсли;
ПараметрыСообщения.Вставить("value", Сообщение);
НовоеСообщение = ПараметрыСообщения;
КонецЕсли;
СообщенияПоТопикам = Отправитель.СообщенияПоТопикам;
МассивСообщенийТопика = СообщенияПоТопикам.Получить(Топик);
Если МассивСообщенийТопика <> Неопределено Тогда
МассивСообщенийТопика.Добавить(НовоеСообщение);
Иначе
МассивСообщенийТопика = Новый Массив;
МассивСообщенийТопика.Добавить(НовоеСообщение);
КонецЕсли;
СообщенияПоТопикам.Вставить(Топик, МассивСообщенийТопика);
ВыполнитьЛогированиеДобавлениеСообщения(НовоеСообщение, Отправитель, Ключ, Раздел, Топик);
КонецПроцедуры
#КонецОбласти
#Область Логирование
Процедура ВыполнитьЛогированиеНазначитьПолучателюТопикИРаздел(Подписчик, МетодHTTP, ТекстЗапроса, Ответ)
ОписаниеДействия = НСтр("ru = 'Назначение получателю топика и раздела'");
ВыполнитьЛогированиеУниверсальное(Подписчик, МетодHTTP, ТекстЗапроса, Ответ, ОписаниеДействия)
КонецПроцедуры
Процедура ВыполнитьЛогированиеПерезаписатьСдвиг(Подписчик, МетодHTTP, ТекстЗапроса, Ответ)
ОписаниеДействия = НСтр("ru = 'Перезапись сдвига'");
ВыполнитьЛогированиеУниверсальное(Подписчик, МетодHTTP, ТекстЗапроса, Ответ, ОписаниеДействия);
КонецПроцедуры
Процедура ВыполнитьЛогированиеПодтвердитьПолучение(Подписчик, МетодHTTP, ТекстЗапроса, Ответ)
ОписаниеДействия = НСтр("ru = 'Подтверждение получения'");
ВыполнитьЛогированиеУниверсальное(Подписчик, МетодHTTP, ТекстЗапроса, Ответ, ОписаниеДействия);
КонецПроцедуры
Процедура ВыполнитьЛогированиеУдалитьПодисчика(Подписчик, МетодHTTP, Ответ)
ОписаниеДействия = НСтр("ru = 'Удаление подписчика'");
ВыполнитьЛогированиеУниверсальное(Подписчик, МетодHTTP, "<empty>", Ответ, ОписаниеДействия);
КонецПроцедуры
Процедура ВыполнитьЛогированиеПолучитьСообщения(Подписчик, МетодHTTP, Ответ, МиниммальныйСдвиг, МаксимальныйСдвиг)
ОписаниеДействия = НСтр("ru = 'Получение сообщений'");
ШаблонТелоЗапроса = НСтр("ru = '<empty> / Получены сообщения сдвигов: с %1 по %2'");
ТелоЗапроса = СтрШаблон(ШаблонТелоЗапроса, МиниммальныйСдвиг, МаксимальныйСдвиг);
//Тело ответа не выводится специально, т.к. оно может быть очень большим.
ВыполнитьЛогированиеУниверсальное(Подписчик, МетодHTTP, ТелоЗапроса, Ответ, ОписаниеДействия, Истина, Ложь);
КонецПроцедуры
Процедура ВыполнитьЛогированиеЗарегистрироватьПодписчика(Подписчик, МетодHTTP, ТекстЗапроса, Ответ)
ОписаниеДействия = НСтр("ru = 'Зарегистирован подписчик'");
ВыполнитьЛогированиеУниверсальное(Подписчик, МетодHTTP, ТекстЗапроса, Ответ, ОписаниеДействия);
КонецПроцедуры
Процедура ВыполнитьЛогированиеПодписаться(Подписчик, МетодHTTP, ТекстЗапроса, Ответ)
ОписаниеДействия = НСтр("ru = 'Подписка'");
ВыполнитьЛогированиеУниверсальное(Подписчик, МетодHTTP, ТекстЗапроса, Ответ, ОписаниеДействия);
КонецПроцедуры
Процедура ВыполнитьЛогированиеНовыйПодписчик(Знач ОписаниеСоединения)
Сообщение = НСтр("ru = 'Создано описание структуры подписчика!'");
ДополнитьПротоколСоединения(ОписаниеСоединения, Сообщение, Ложь);
КонецПроцедуры
Процедура ВыполнитьЛогированиеНовыйОтправитель(Знач ОписаниеСоединения)
Сообщение = НСтр("ru = 'Создано описание структуры отправителя!'");
ДополнитьПротоколСоединения(ОписаниеСоединения, Сообщение, Ложь);
КонецПроцедуры
Процедура ВыполнитьЛогированиеОтправитьСообщения(Отправитель, МетодHTTP, ТекстЗапроса, Ответ)
ОписаниеДействия = НСтр("ru = 'Отправка сообщений'");
ВыполнитьЛогированиеУниверсальное(Отправитель, МетодHTTP, ТекстЗапроса, Ответ, ОписаниеДействия);
КонецПроцедуры
Процедура ВыполнитьЛогированиеДобавлениеСообщения(НовоеСообщение, Отправитель, Ключ, Раздел, Топик)
ШаблонСообщения = НСтр("ru = 'Добавлено сообщение
| Текст сообщения: %1
| Топик: %2
| Ключ: %3
| Раздел: %4'");
Если ТипЗнч(НовоеСообщение) = Тип("Строка") Тогда
ТекстСообщения = НовоеСообщение;
Иначе
ТекстСообщения = КоннекторHTTP.ОбъектВJson(НовоеСообщение);
КонецЕсли;
Сообщение = СтрШаблон(ШаблонСообщения, ТекстСообщения, Топик, Ключ, Раздел);
ДополнитьПротоколСоединения(Отправитель.ОписаниеСоединения, Сообщение, Ложь);
КонецПроцедуры
Процедура ВыполнитьЛогированиеПолучитьСдвиги(Подписчик, МетодHTTP, ТекстЗапроса, Ответ)
ОписаниеДействия = НСтр("ru = 'Получение сдвигов'");
ВыполнитьЛогированиеУниверсальное(Подписчик, МетодHTTP, ТекстЗапроса, Ответ, ОписаниеДействия);
КонецПроцедуры
Процедура ВыполнитьЛогированиеУниверсальное(Инициатор, МетодHTTP, ТекстЗапроса, Ответ, ОписаниеДействия, ВыводитьТелоЗапроса = Истина, ВыводитьТелоОтвета = Истина)
ТелоОтвет = КоннекторHTTP.КакТекст(Ответ);
ТелоЗапрос = ТекстЗапроса;
Если Не ВыводитьТелоОтвета Тогда
ТелоОтвет = НСтр("ru = '<отключен вывод>'");
КонецЕсли;
ШаблонСообщения = НСтр("ru = '
|Выполнено: %1
| URL: %2
| Метод http: %3
| Тело запроса: %4
| Код ответа: %5
| Тело ответа %6
| Время выполнения: %7'");
Сообщение = СтрШаблон(
ШаблонСообщения,
ОписаниеДействия,
Ответ.URL,
МетодHTTP,
ТелоЗапрос,
Ответ.КодСостояния,
ТелоОтвет,
Ответ.ВремяВыполнения);
ДополнитьПротоколСоединения(Инициатор.ОписаниеСоединения, Сообщение, Ложь);
КонецПроцедуры
#КонецОбласти
#Область СлужебныеПроцедурыИФункции
Процедура ВыполнитьКонтрольДопустимыхФорматов(СвойстваСоединения, ВозможныеФорматы)
Формат = СвойстваСоединения.Формат;
Если ВозможныеФорматы.Найти(Формат) = Неопределено Тогда
ЭтоОшибка = Истина;
ШаблонСообщения = НСтр("ru = 'Формат обмена: %1 не поддерживается данным модулем!'");
Сообщение = СтрШаблон(ШаблонСообщения, Формат);
Иначе
ЭтоОшибка = Ложь;
ШаблонСообщения = НСтр("ru = 'Проверка выполнена успешно. Формат обмена: %1 поддерживается данным модулем.'");
Сообщение = СтрШаблон(ШаблонСообщения, Формат);
КонецЕсли;
ДополнитьПротоколСоединения(СвойстваСоединения, Сообщение, ЭтоОшибка)
КонецПроцедуры
Процедура ДополнитьПротоколСоединения(СвойстваСоединения, Сообщение, ЭтоОшибка)
РезультатСоединения = СвойстваСоединения.РезультатСоединения;
РезультатСоединения.ИсторияОпераций.Добавить(Сообщение);
РезультатСоединения.ИнформацияОбОперации = Сообщение;
РезультатСоединения.ОшибкаВыполнения = ЭтоОшибка;
КонецПроцедуры
Функция СвойстваПодписчика()
СтруктураПодписчика = Новый Структура;
СтруктураПодписчика.Вставить("ОписаниеСоединения");
СтруктураПодписчика.Вставить("ИмяГруппыПодписчиков");
СтруктураПодписчика.Вставить("ИмяЭкземпляраПодписчика");
СтруктураПодписчика.Вставить("ИспользоватьАвтоподтверждение");
СтруктураПодписчика.Вставить("МаксимальныйРазмерОтветаБайт");
Возврат СтруктураПодписчика;
КонецФункции
Функция СвойстваОтправителя()
СообщенияПоТопикам = Новый Соответствие;
СтруктураОтправитея = Новый Структура;
СтруктураОтправитея.Вставить("ОписаниеСоединения");
СтруктураОтправитея.Вставить("СообщенияПоТопикам", СообщенияПоТопикам);
Возврат СтруктураОтправитея;
КонецФункции
Процедура ЗаполнитьЗаголовки(Знач СвойстваСоединения, Знач ФорматОбмена)
Если ФорматОбмена = "json" Тогда
Заголовки = Новый Соответствие;
Заголовки.Вставить("Content-Type", "application/vnd.kafka.json.v2+json");
Заголовки.Вставить("Accept", "application/vnd.kafka.json.v2+json");
ЗаголовкиСтруктура = Новый Структура;
ЗаголовкиСтруктура.Вставить("Заголовки", Заголовки);
СвойстваСоединения.Вставить("Заголовки", ЗаголовкиСтруктура);
КонецЕсли;
КонецПроцедуры
Процедура СгенерироватьИмяПустомуЗначению(Имя)
Если Имя = Неопределено Тогда
Имя = СокрЛП(Новый УникальныйИдентификатор());
КонецЕсли;
КонецПроцедуры
Функция ЗаголовкиСоединения(Знач ОписаниеСоединения, ЗаполнитьТипОтправляемогоКонтента = Истина, ЗаполнитьТипПринимаемогоКонтента = Истина)
КопияСтруктуры = Новый Структура;
Для Каждого ЭлементСтруктуры Из ОписаниеСоединения.Заголовки Цикл
КопияСтруктуры.Вставить(ЭлементСтруктуры.Ключ, ЭлементСтруктуры.Значение);
КонецЦикла;
Заголовки = КопияСтруктуры;
Если ЗаполнитьТипПринимаемогоКонтента = Ложь Тогда
Заголовки.Заголовки.Удалить("Accept");
КонецЕсли;
Если ЗаполнитьТипОтправляемогоКонтента = Ложь Тогда
Заголовки.Заголовки.Удалить("Content-Type");
КонецЕсли;
Возврат Заголовки;
КонецФункции
#КонецОбласти