6

请教一个关于线程条件变量的问题

 3 years ago
source link: https://www.v2ex.com/t/794758
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

V2EX  ›  C

请教一个关于线程条件变量的问题

  commoccoom · 1 天前 · 686 次点击

两个线程,线程 1(processData)生成数据 p 并写入数据库,另一个线程 2(someSocket)将线程 1 生成的数据 p 通过 socket 发送到客户端。

当线程 1 中的for循环结束时,如何通知线程 2 while(res == TRUE)应当结束了,线程 2 因为 pthread_cond_wait 一直在阻塞,但是此时线程 1 不会再发出信号了。这是不是就是死锁了。。。

#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <unistd.h>
#include <mysql.h>
#include <errno.h>
#include <pthread.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#define TRUE 1
#define FALSE 0
#define MAX_STRING 128
#define PORT 3389
#define SA struct sockaddr

pthread_cond_t pready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t plock = PTHREAD_MUTEX_INITIALIZER;

// 存储温湿度结构体
typedef struct {
    int temp;
    int humd;
}humiture;

// 可变长消息体
typedef struct { 
    int nLen; 
    char data[ 0];
}MyMessage;

//全局变量
humiture p;
int res = TRUE;

void error(char *msg)
{
    fprintf(stderr, "%s: %s\n", msg, strerror(errno));
    exit(1);
}

void info(char *msg)
{
    fprintf(stdout,"%s\n",msg);
}

void finish_with_error(MYSQL *con)
{
  fprintf(stderr, "%s\n", mysql_error(con));
  mysql_close(con);
  exit(1);
}

// socket 发送数据
int sendall(int s, char *buf, int *len)
{
    int total = 0;
    int bytesleft = *len;
    int n;

    while(total < *len) {
        n = send(s, buf+total, bytesleft, 0);
        if (n == -1) { break; }
        total += n;
        bytesleft -= n;
    }

    *len = total;

    return n==-1?-1:0;
} 

// 生成数据
humiture  collectData()
{
    int temperature,humidity;
    srand((unsigned)time(NULL));    // 根据时间来播种随机数种子
    // 生成数据
    temperature = rand()%40+10;     // 生成 10~50 的随机数 当做温度
    humidity = rand()%70+10;    // 生成 10~80 的随机数当做湿度
    
    humiture p = {humidity, temperature};
    
    return p;
}

// 启动 MySQL 建立连接
MYSQL* startMysql()
{
    MYSQL *con = mysql_init(NULL);

    if (con == NULL)
    {        
        fprintf(stderr, "%s\n", mysql_error(con));
        exit(1);
    }

    if (mysql_real_connect(con, "localhost", "root", "root#admin","test", 0, NULL, 0) == NULL)
    {
        finish_with_error(con);
    }

    return con;
}

// 生成数据并存入数据库
void * processData()
{
    MYSQL * con = startMysql();

    for(int i = 0; i <20;i++)
    {
       
        pthread_mutex_lock(&plock);
        p = collectData();
        pthread_cond_signal(&pready);
        pthread_mutex_unlock(&plock);

        char query[MAX_STRING] = {0};

        snprintf(query,MAX_STRING,"INSERT INTO humiture (temperature,humidity) VALUES (%d,%d)", p.temp, p.humd);

        if (mysql_query(con, query)) 
        {
            finish_with_error(con);
        }
        sleep(2);
    } 

    // 循环结束给出信号
    res = FALSE;

    mysql_close(con); 
    mysql_library_end();
    return NULL;
}

void * someSocket()
{
    int sockfd, connfd;
    struct sockaddr_in servaddr, cli;
    socklen_t len;
    char buff[10];
    
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd == -1) {
        error("socket creation failed...");
    }
    else
        info("Socket successfully created...");

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(PORT);

    int reuse = 1;
    if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(int)) == -1)
        error("Can't set the reuse option on the socket...");

    if ((bind(sockfd, (SA*)&servaddr, sizeof(servaddr))) != 0) {
        error("socket bind failed...");
    }
    else
        fprintf(stdout,"%s\n","Socket successfully binded...");

    if ((listen(sockfd, 5)) != 0) {
        error("Listen failed...");
    }
    else
       info("Server listening...");    

    while(TRUE)
    {
        len = sizeof(cli);        
        connfd = accept(sockfd, (SA*)&cli, &len);    
        if (connfd < 0) {
            error("server acccept failed...");
        }
        else
            info("server acccept the client...");
        
        MyMessage * myMessage = (MyMessage*)malloc(sizeof(MyMessage)+sizeof(humiture));
        int needSend = sizeof(MyMessage)+sizeof(humiture);  
        char *buffer =(char*)malloc(needSend);

        while(res == TRUE)
        {
            myMessage->nLen = htonl(sizeof(humiture));
            pthread_mutex_lock(&plock);
            pthread_cond_wait(&pready,&plock);
            memcpy(myMessage->data,&p,sizeof(humiture));
            pthread_mutex_unlock(&plock);
            memcpy(buffer,myMessage,needSend);
            sendall(connfd,buffer,&needSend);
            recv(connfd,buff,sizeof(buff),0);
        }
        // 当需要停止的时候发送 0 字节信息让客户端停止循环
        if(res == FALSE) 
        {
            // 将发送消息定义为 0
            myMessage->nLen = htonl(res);
            char *buffer =(char*)malloc(sizeof(int));
            memcpy(buffer,myMessage,sizeof(MyMessage));
            send(connfd,buffer,sizeof(MyMessage),0);
            shutdown(connfd,SHUT_RDWR);
            free(buffer);
        }

        free(myMessage);
        free(buffer);
        close(connfd);
        break;
    }
    close(sockfd);    
    return NULL;
}

int main(void)
{
    pthread_t t0,t1;

    if(pthread_create(&t0, NULL,processData,NULL)==-1)
    {
        error("Can't create thread processData");
    }
    if(pthread_create(&t1,NULL,someSocket,NULL)==-1)
    {
         error("Can't create thread someSocket");
    }

    void *reslut;
    if(pthread_join(t0,&reslut)==-1)
    {
        error("Can't reclaim thread t0");
    }
    if(pthread_join(t1,&reslut)==-1)
    {
        error("Can't reclaim thread t1");
    }

    return 0;
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK