两个线程,线程 1(processData)生成数据 p 并写入数据库,另一个线程 2(someSocket)将线程 1 生成的数据 p 通过 socket 发送到客户端。
当线程 1 中的for循环结束时,如何通知线程 2 while(res == TRUE)应当结束了,线程 2 因为 pthread_cond_wait 一直在阻塞,但是此时线程 1 不会再发出信号了。这是不是就是死锁了。。。
代码如下
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <unistd.h>
#include <mysql.h>
#include <errno.h>
#include <pthread.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#define TRUE 1
#define FALSE 0
#define MAX_STRING 128
#define PORT 3389
#define SA struct sockaddr
pthread_cond_t pready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t plock = PTHREAD_MUTEX_INITIALIZER;
// 存储温湿度结构体
typedef struct {
    int temp;
    int humd;
}humiture;
// 可变长消息体
typedef struct { 
    int nLen; 
    char data[ 0];
}MyMessage;
//全局变量
humiture p;
int res = TRUE;
void error(char *msg)
{
    fprintf(stderr, "%s: %s\n", msg, strerror(errno));
    exit(1);
}
void info(char *msg)
{
    fprintf(stdout,"%s\n",msg);
}
void finish_with_error(MYSQL *con)
{
  fprintf(stderr, "%s\n", mysql_error(con));
  mysql_close(con);
  exit(1);
}
// socket 发送数据
int sendall(int s, char *buf, int *len)
{
    int total = 0;
    int bytesleft = *len;
    int n;
    while(total < *len) {
        n = send(s, buf+total, bytesleft, 0);
        if (n == -1) { break; }
        total += n;
        bytesleft -= n;
    }
    *len = total;
    return n==-1?-1:0;
} 
// 生成数据
humiture  collectData()
{
    int temperature,humidity;
    srand((unsigned)time(NULL));    // 根据时间来播种随机数种子
    // 生成数据
    temperature = rand()%40+10;     // 生成 10~50 的随机数 当做温度
    humidity = rand()%70+10;    // 生成 10~80 的随机数当做湿度
    
    humiture p = {humidity, temperature};
    
    return p;
}
// 启动 MySQL 建立连接
MYSQL* startMysql()
{
    MYSQL *con = mysql_init(NULL);
    if (con == NULL)
    {        
        fprintf(stderr, "%s\n", mysql_error(con));
        exit(1);
    }
    if (mysql_real_connect(con, "localhost", "root", "root#admin","test", 0, NULL, 0) == NULL)
    {
        finish_with_error(con);
    }
    return con;
}
// 生成数据并存入数据库
void * processData()
{
    MYSQL * con = startMysql();
    for(int i = 0; i <20;i++)
    {
       
        pthread_mutex_lock(&plock);
        p = collectData();
        pthread_cond_signal(&pready);
        pthread_mutex_unlock(&plock);
        char query[MAX_STRING] = {0};
        snprintf(query,MAX_STRING,"INSERT INTO humiture (temperature,humidity) VALUES (%d,%d)", p.temp, p.humd);
        if (mysql_query(con, query)) 
        {
            finish_with_error(con);
        }
        sleep(2);
    } 
    // 循环结束给出信号
    res = FALSE;
    mysql_close(con); 
    mysql_library_end();
    return NULL;
}
void * someSocket()
{
    int sockfd, connfd;
    struct sockaddr_in servaddr, cli;
    socklen_t len;
    char buff[10];
    
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd == -1) {
        error("socket creation failed...");
    }
    else
        info("Socket successfully created...");
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(PORT);
    int reuse = 1;
    if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(int)) == -1)
        error("Can't set the reuse option on the socket...");
    if ((bind(sockfd, (SA*)&servaddr, sizeof(servaddr))) != 0) {
        error("socket bind failed...");
    }
    else
        fprintf(stdout,"%s\n","Socket successfully binded...");
    if ((listen(sockfd, 5)) != 0) {
        error("Listen failed...");
    }
    else
       info("Server listening...");    
    while(TRUE)
    {
        len = sizeof(cli);        
        connfd = accept(sockfd, (SA*)&cli, &len);    
        if (connfd < 0) {
            error("server acccept failed...");
        }
        else
            info("server acccept the client...");
        
        MyMessage * myMessage = (MyMessage*)malloc(sizeof(MyMessage)+sizeof(humiture));
        int needSend = sizeof(MyMessage)+sizeof(humiture);  
        char *buffer =(char*)malloc(needSend);
        while(res == TRUE)
        {
            myMessage->nLen = htonl(sizeof(humiture));
            pthread_mutex_lock(&plock);
            pthread_cond_wait(&pready,&plock);
            memcpy(myMessage->data,&p,sizeof(humiture));
            pthread_mutex_unlock(&plock);
            memcpy(buffer,myMessage,needSend);
            sendall(connfd,buffer,&needSend);
            recv(connfd,buff,sizeof(buff),0);
        }
        // 当需要停止的时候发送 0 字节信息让客户端停止循环
        if(res == FALSE) 
        {
            // 将发送消息定义为 0
            myMessage->nLen = htonl(res);
            char *buffer =(char*)malloc(sizeof(int));
            memcpy(buffer,myMessage,sizeof(MyMessage));
            send(connfd,buffer,sizeof(MyMessage),0);
            shutdown(connfd,SHUT_RDWR);
            free(buffer);
        }
        free(myMessage);
        free(buffer);
        close(connfd);
        break;
    }
    close(sockfd);    
    return NULL;
}
int main(void)
{
    pthread_t t0,t1;
    if(pthread_create(&t0, NULL,processData,NULL)==-1)
    {
        error("Can't create thread processData");
    }
    if(pthread_create(&t1,NULL,someSocket,NULL)==-1)
    {
         error("Can't create thread someSocket");
    }
    void *reslut;
    if(pthread_join(t0,&reslut)==-1)
    {
        error("Can't reclaim thread t0");
    }
    if(pthread_join(t1,&reslut)==-1)
    {
        error("Can't reclaim thread t1");
    }
    return 0;
}
|      1wevsty      2021-08-10 09:29:11 +08:00 线程 1 退出的时候再发一个 pthread_cond_signal 这样线程 2 收到信号就不会锁死了。最后线程 2 检查一下退出标志再发数据不就行了么? | 
|  |      2commoccoom OP @wevsty 确实,我人傻了😂 | 
|      3FranzKafka95      2021-08-10 13:11:31 +08:00 via Android 或者线程 1 执行完 for 循环以后先别置 res 为 false,再加一个条件变量在这儿等,等到线程 2 发送完以后通过信号量通知线程 1 再置为 false,等到线程 2 再执行 while 循环时 res 已经为 flalse,这样就可以同步了。 | 
|  |      4commoccoom OP @FranzKafka95 后续我的想法是线程 1(processData)会一直循环,线程 2(someSocket)是接收别的信号,然后启动或者停止这样。所以线程 1 不能阻塞,线程 2 可以阻塞。线程 1 现在加了循环次数是因为之前我发现有内存泄漏,所以加个停止条件看看哪里有问题😂。 |