Call async function in class constructor in NodeJS

  Kiến thức lập trình

is it okay to call async function in class constructor? I’m not expecting any returned values when calling it, i just want to call that async function to connect to kafka connection.

I called the connect async method to constructor, I just want to connect it. see code below.

Code is something like this:

const httpStatus = require('http-status');
const { Kafka, Partitioners } = require('kafkajs');

const logger = require('../logger');

const { KAFKA_ACKS } = require('../constants/kafka-constants');
const { KafkaInstanceError } = require('../../utils/custom-errors/class-errors');
const { KAFKA_INSTANCE_ERRORS } = require('../constants/error-messages');
const { HTTP_RESPONSE_CODE } = require('../../utils/response-codes');

class Producer {
  #config;

  #key;

  #producerInstance;

  static #instances = {};

  constructor(key, kafkaConfig = {}) {
    this.#config = kafkaConfig;
    this.#key = key;

    this.#connect();
  }

  async #connect() {
    if (Producer.#instances[this.#key]) return;
    if (!Producer.#instances[this.#key]) Producer.#instances[this.#key] = this.#key;

    this.#producerInstance = await this.#initProducer();
    await this.#producerInstance.connect();
  }

  #initProducer() {
    const kafka = new Kafka(this.#config.kafka);
    const producer = kafka.producer({
      ...this.#config?.producer,
      createPartitioner: Partitioners.DefaultPartitioner,
    });
    return producer;
  }

  async produceEvent(events = {}) {
    try {
      if (Object.keys(events).length === 0) {
        throw new KafkaInstanceError(
          KAFKA_INSTANCE_ERRORS.EVENT_OBJECT_EMPTY,
          HTTP_RESPONSE_CODE.DATA_PROCESSING_ERROR,
          httpStatus.BAD_REQUEST,
        );
      }

      logger.info('sending events to kafka notification');

      await this.#producerInstance.send({
        topic: this.#config.topic,
        acks: this.#config?.producerSendOptions?.acks || KAFKA_ACKS.ALL,
        messages: [events],
      });

      logger.info('event sent to kafka notification');
      return events;
    } catch (error) {
      logger.error(error.message);
      throw new KafkaInstanceError(
        error.message,
        error.code || HTTP_RESPONSE_CODE.INTERNAL_SERVER_ERROR,
        error.statusCode || httpStatus.INTERNAL_SERVER_ERROR,
      );
    }
  }
}

module.exports = {
  Producer,
};

LEAVE A COMMENT