From 2c93f8bb73a5787b091e45c1ce9f98daf91733b9 Mon Sep 17 00:00:00 2001 From: hejun Date: Sun, 1 Jun 2025 02:00:29 -0400 Subject: [PATCH] learning msg queue including signal mutex and sem. 2025-6-1 at LongQuan --- src/msg_queu_test.c | 267 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 267 insertions(+) create mode 100644 src/msg_queu_test.c diff --git a/src/msg_queu_test.c b/src/msg_queu_test.c new file mode 100644 index 0000000..2a74eef --- /dev/null +++ b/src/msg_queu_test.c @@ -0,0 +1,267 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define DEBUG 0 + +volatile atomic_int sigrcved = 0; +// malloc and free + +void sig_handler(int sig) +{ + atomic_store(&sigrcved, 1); +} + +#define UNKONW_TYPE "unknown mem type" + +#define XMALLOC(SIZE, TYPE) xmalloc((SIZE), #TYPE) +#define XFREE(P, TYPE) xfree((P), #TYPE) + +#define LOG_INFO(INFO) printf("[INFO]-%s\n", #INFO); + +typedef enum +{ + FALSE = 0, + TRUE = 1 +} bool_t; + +void *xmalloc(size_t size, char *type) +{ + void *p = NULL; +#if DEBUG + printf("malloc %u bytes memory for %s\n", size, type ? type : UNKONW_TYPE); +#endif + p = malloc(size); + assert(p); + memset(p, 0x0, size); + + return p; +} + +void xfree(void *p, char *type) +{ + if (!p) + { +#if DEBUG + printf("free %s encounter one null pointer\n", type ? type : UNKONW_TYPE); +#endif + return; + } +#if DEBUG + printf("free %s\n", type ? type : UNKONW_TYPE); +#endif + free(p); +} + +// typedef unsigned int size_t; + +// msg define + +struct mmsg +{ + size_t len; + char msg[0]; +}; + +typedef struct mmsg msg_t; + +// msg queue + +struct mqueue +{ + + pthread_mutex_t mtx; + + size_t capcity; + + size_t head; + + size_t tail; + + sem_t queue_fully; + + sem_t queue_empty; + + msg_t *msg_lst[0]; +}; + +typedef struct mqueue *pMqueue_t; + +pMqueue_t mqueue_create(size_t cap) +{ + + pMqueue_t pq = NULL; + + if (cap < 1) + return NULL; + + pq = XMALLOC(sizeof(struct mqueue) + cap * sizeof(msg_t *), mqueue object); + pq->capcity = cap; + + return pq; +} + +bool_t mqueue_init(pMqueue_t mq) +{ + + if (!mq) + return FALSE; + + if (pthread_mutex_init(&mq->mtx, NULL) < 0) + { + LOG_INFO(failed to init mtx); + return FALSE; + } + + if (sem_init(&mq->queue_fully, 0, 0) < 0) + { + LOG_INFO(faild to init sem); + return FALSE; + } + + if (sem_init(&mq->queue_empty, 0, mq->capcity) < 0) + { + LOG_INFO(faild to init sem); + return FALSE; + } + + return TRUE; +} + +void mqueue_destory(pMqueue_t pq) +{ + + // may consume all unhandled msg first + + pthread_mutex_destroy(&pq->mtx); + + sem_destroy(&pq->queue_empty); + sem_destroy(&pq->queue_fully); + + XFREE(pq, "mqueu"); +} + +msg_t *get_new_msg(char *info) +{ + msg_t *pmsg = NULL; + + size_t len = strlen(info); + pmsg = XMALLOC(sizeof(msg_t) + len + 1, msg type); + + memcpy(pmsg->msg, info, len); + pmsg->len = len; + + return pmsg; +} + +void *producer_task(void *arg) +{ + + pMqueue_t pq = arg; + msg_t *msg = NULL; + int msg_id = 0; + char info[256] = {0}; + + if (pq == NULL) + return NULL; + + while (1) + { + if (sigrcved) + { + printf("proudcer stop as SIGINT recived\n"); + return NULL; + } + + sprintf(info, "msg from %lu at %d times", pthread_self(), msg_id++); + + msg = get_new_msg(info); + sem_wait(&pq->queue_empty); + + pthread_mutex_lock(&pq->mtx); + + int loc = (pq->head + pq->capcity) % pq->capcity; + + pq->msg_lst[loc] = msg; + pq->head++; + + pthread_mutex_unlock(&pq->mtx); + sem_post(&pq->queue_fully); + + msg = NULL; + memset(info, 0x0, sizeof(info)); + sleep(2); + } + + return NULL; +} + +void *consumer_task(void *arg) +{ + pMqueue_t pq = arg; + if (pq == NULL) + return NULL; + + msg_t *pmsg = NULL; + + while (1) + { + if (sigrcved) + { + printf("consumer stop as SIGINT recived\n"); + return NULL; + } + + sem_wait(&pq->queue_fully); + + pthread_mutex_lock(&pq->mtx); + + pmsg = pq->msg_lst[pq->tail]; + + pq->tail = (pq->tail + 1 + pq->capcity) % pq->capcity; + + pthread_mutex_unlock(&pq->mtx); + sem_post(&pq->queue_empty); + + printf("consumer recv msg:%s\n", pmsg->msg); + XFREE(pmsg, "msg type"); + + sleep(1); + } + + return NULL; +} + +int main(int argc, char const *argv[]) +{ + + signal(SIGINT, sig_handler); + + pMqueue_t pq; + pq = mqueue_create(5); + mqueue_init(pq); + +#define PRODUCER_NUM 2 + + pthread_t producer[PRODUCER_NUM] = {0}, consumer; + + for (int idx = 0; idx < PRODUCER_NUM; ++idx) + { + pthread_create(&producer[idx], NULL, producer_task, pq); + } + + pthread_create(&consumer, NULL, consumer_task, pq); + + pthread_join(consumer, NULL); + + printf("main thread stop\n"); + + return 0; +}