};
}
-export function sendKafkaMessages() {
+export function sendBatchOfKafkaMessages(batchSize = 250) {
+ const messages = [];
const cloudEventHeaders = getCloudEventHeaders();
const networkElementId = getRandomNetworkElement();
- const avcCloudEvent = {
- key: schemaRegistry.serialize({
- data: networkElementId,
- schemaType: SCHEMA_TYPE_STRING,
- }),
- value: schemaRegistry.serialize({
- data: testEventPayload,
- schemaType: SCHEMA_TYPE_STRING
- }),
- headers: cloudEventHeaders
- };
+ for (let i = 0; i < batchSize; i++) {
+ const avcCloudEvent = {
+ key: schemaRegistry.serialize({
+ data: networkElementId,
+ schemaType: SCHEMA_TYPE_STRING,
+ }),
+ value: schemaRegistry.serialize({
+ data: testEventPayload,
+ schemaType: SCHEMA_TYPE_STRING
+ }),
+ headers: cloudEventHeaders
+ };
+ messages.push(avcCloudEvent);
+ }
try {
- kafkaProducer.produce({messages: [avcCloudEvent]});
- messagesSent++;
- const isMessageSent = check(kafkaProducer, {
- 'Message sent successfully': (producer) => producer != null,
+ kafkaProducer.produce({messages: messages});
+ messagesSent += messages.length;
+ const isBatchSent = check(kafkaProducer, {
+ ['Batch of ${batchSize} messages sent successfully']: (producer) => producer != null,
});
- if (!isMessageSent) {
- console.error('Failed to send message:', avcCloudEvent);
+ if (!isBatchSent) {
+ console.error('Failed to send batch of messages, batch size : ', batchSize);
}
} catch (error) {
console.error(`Error during message production: ${error.message}`, avcCloudEvent);
import { createCmHandles, deleteCmHandles, waitForAllCmHandlesToBeReady } from './common/cmhandle-crud.js';
import { executeCmHandleSearch, executeCmHandleIdSearch } from './common/search-base.js';
import { passthroughRead, passthroughWrite, legacyBatchRead } from './common/passthrough-crud.js';
-import { sendKafkaMessages } from './common/produce-avc-event.js';
+import { sendBatchOfKafkaMessages } from './common/produce-avc-event.js';
let cmHandlesCreatedPerSecondTrend = new Trend('cmhandles_created_per_second', false);
let cmHandlesDeletedPerSecondTrend = new Trend('cmhandles_deleted_per_second', false);
}
export function produceAvcEventsScenario() {
- sendKafkaMessages();
+ sendBatchOfKafkaMessages(250);
}
export function legacyBatchConsumeScenario() {