CXD Linux Engineer

pthread多线程编程

2017-12-05

线程的创建与退出

线程的创建

POSIX线程使用pthread_create()函数创建,注意在编译多线程程序的时候需要加上-lpthread选项,新创建的线程从thread_function函数的地址开始运行,该函数只有一个无类型的指针参数,线程创建成功会将线程ID存放在thread_id参数中,attributes参数用于设置新线程的属性。

#include <pthread.h>
int pthread_create (pthread_t *thread_id, const pthread_attr_t *attributes, void *(*thread_function)(void *), void *arguments);

线程的退出

单个线程可以通过3种方式退出:

  1. 线程可以简单的从启动函数中返回,返回值是线程的退出码。
  2. 线程可以被同一进程的其他线程取消。
  3. 线程调用pthread_exit函数退出。

pthread_exit函数有一个无类型的指针参数,表示此线程的返回值。进程中的其他线程可以通过调用pthread_join函数来获取这个返回值,调用线程将一直阻塞,直到指定的线程退出。

#include <pthread.h>
int pthread_exit (void *status);
int pthread_join (pthread_t thread, void **status_ptr);

线程同步

互斥量

互斥量(mutex)从本质上说是一把锁,在访问共享资源前对互斥量进行加锁,在访问完成后解锁。互斥量进行加锁后,任何其他试图再次对互斥量进行加锁的线程都会被阻塞直到当前线程释放该互斥锁。如果同时有多个线程被阻塞则这些线程都会被唤醒,第一个运行的线程会对互斥量加锁,其他线程就会看到互斥量依然是锁着的,只能再次被阻塞等待下次访问。

互斥量是用pthread_mutex_t数据类型表示的,在使用互斥量之前需要使用pthread_mutex_init函数对他进行初始化,在释放内存前需要调用pthread_mutex_destroy函数销毁互斥量。

#include <pthread.h>
int pthread_mutex_init (pthread_mutex_t *mut, const pthread_mutexattr_t *attr);
int pthread_mutex_destroy (pthread_mutex_t *mut);

对互斥量进行加锁,需要调用pthread_mutex_lock函数,如果互斥量已经上锁调用线程将阻塞直到互斥量被解锁。对互斥量解锁,需要调用pthread_mutex_unlock函数。如果线程不希望被阻塞,他可以使用pthread_mutex_trylock函数尝试对互斥量进行加锁。如果此时互斥量处于未锁住状态,那么pthread_mutex_trylock将锁住互斥量并返回0,如果互斥量已经被锁住则会直接返回EBUSY而不会被阻塞。pthread_mutex_timedlock函数允许绑定线程阻塞时间,当达到超时时间时,此函数不会对互斥量进行加锁,而是返回错误码ETIMEDOUT

#include <pthread.h>
int pthread_mutex_lock (pthread_mutex_t *mut);
int pthread_mutex_unlock (pthread_mutex_t *mut);
int pthread_mutex_trylock (pthread_mutex_t *mut);

#include <time.h>
int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex, const struct timespec *restrict abs_timeout);

读写锁

读写锁(reader-writer-lock)与互斥量类似,不过读写锁允许更高的并行性。因为并行读取一个公共变量时不会产生冲突,所以读写锁有三种状态:读模式下的加锁状态,写模式下的加锁状态,不加锁状态。一次只有一个线程可以占有写模式下的锁,但是读模式下的锁可以被多个线程占用。当锁在写加锁状态时,在这个锁被解锁之前,所有试图对这个锁加锁的线程都会被阻塞。当读写锁在读加锁状态时,所有试图以读模式对他进行加锁的线程都可以得到访问权,但是任何希望以写模式对此锁进行加锁的线程都会阻塞,直到所有的线程释放它们的读锁为止。当读写锁处于读模式锁住的状态,而这时有一个线程试图以写模式获取锁时,读写锁通常会阻塞随后的读模式锁请求,这样可以避免读模式锁长期占用,而等待的写模式锁请求一直得不到满足。读写锁一般非常适用于对数据结构读的次数远大于写的情况。读写锁使用下面函数初始化和销毁:

#include <pthread.h> 
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr); 
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

读模式下锁定读写锁,需要调用 pthread_rwlock_rdlock函数,写模式下锁定读写锁使用pthread_rwlock_wrlock函数。而锁的释放都使用同一个函数pthread_rwlock_unlock。读写锁也有不被阻塞而是返回ETIMEDOUT和带有超时返回的函数接口:

#include <pthread.h> 
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); 
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); 
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock); 
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock); 

#include <time.h> 
int pthread_rwlock_timedrdlock(pthread_rwlock_t *restrict rwlock, const struct timespec *restrict tsptr); 
int pthread_rwlock_timedwrlock(pthread_rwlock_t *restrict rwlock, const struct timespec *restrict tsptr);

自旋锁

自旋锁与互斥量类似,但它不是通过休眠使进程阻塞,而是在获取锁之前一直处于忙等(自旋)阻塞状态。自旋锁可用于以下情况:锁被持有的时间短,而且线程并不希望在重新调度上花费太多的成本。当自旋锁用在非抢占式内核中时是非常有用的:除了提供互斥机制以外,它们会阻塞中断,这样中断处理程序就不会让系统陷入死锁状态,因为它需要获取已被加锁的自旋锁。在这种类型的内核中,中断处理程序不能休眠,因此它们能用的同步原语只能是自旋锁,但在用户层自旋锁并不是非常有用。自旋锁使用下面函数初始化和销毁以及加锁和解锁:

#include <pthread.h> 
int pthread_spin_init(pthread_spinlock_t *lock,int pshared); 
int pthread_spin_destroy(pthread_spinlock_t *lock); 

int pthread_spin_lock(pthread_spinlock_t *lock); 
int pthread_spin_trylock(pthread_spinlock_t *lock); 
int pthread_spin_unlock(pthread_spinlock_t *lock); 

条件变量

条件变量是线程可用的另一种同步机制。条件变量给多个线程提供了一个会合的场所。条件变量与互斥量一起使用时,允许线程以无竞争的方式等待特定的条件发生。调用pthread_cond_wait函数等待条件变量时,把锁住的互斥量也传递给函数,此函数然后自动把调用线程放到等待条件的线程列表上,对互斥量解锁。这就关闭了条件检查和线程进入休眠状态等待条件改变这两个操作之间的时间通道,这样线程就不会错过条件的任何变化。pthread_cond_wait函数返回时,互斥量再次被锁住。示例:

#include <stdio.h> 
#include <unistd.h>  
#include <pthread.h>  
  
struct cond{  
    pthread_mutex_t mutex;  
    pthread_cond_t cond;  
    int count;
};  

struct cond sharedData;

void * produce(void *ptr)  
{  
    for (int i = 0; i < 6; ++i)  
    {  
        pthread_mutex_lock(&sharedData.mutex);  
        sharedData.count++;
        pthread_mutex_unlock(&sharedData.mutex);  

        printf("--- produce: sharedData.count = %d\n", sharedData.count);  
        /* 当 sharedData.count 大于等于2时,唤醒消费线程 */
        if (sharedData.count >= 2)  
            pthread_cond_signal(&sharedData.cond);  

        sleep(1);
    }  
}  
  
void * consume(void *ptr)  
{  
    for (int i = 0; i < 6;  ++i)  
    {  
        pthread_mutex_lock(&sharedData.mutex);  
        /* 当 sharedData.count 为0时,进入条件变量等待 */
        if (sharedData.count == 0)  
            pthread_cond_wait(&sharedData.cond, &sharedData.mutex);  
  
        sharedData.count--;
        pthread_mutex_unlock(&sharedData.mutex);  

        printf("consume: sharedData.count = %d\n", sharedData.count);  
    }  
}  
  
int main()  
{  
    pthread_t tid1, tid2;  

    /* 初始化互斥量和条件变量 */
    pthread_mutex_init(&sharedData.mutex, NULL);
    pthread_cond_init(&sharedData.cond, NULL);
    sharedData.count = 0;
  
    pthread_create(&tid2, NULL, produce, NULL);  
    pthread_create(&tid1, NULL, consume, NULL); 
  
    /* 主线程等待所有线程退出 */
    void *retVal;  
    pthread_join(tid1, &retVal);  
    pthread_join(tid2, &retVal);  

    /* 销毁互斥量和条件变量 */
    pthread_cond_destroy(&sharedData.cond);
    pthread_mutex_destroy(&sharedData.mutex);
  
    return 0;  
} 

/* 输出结果:
--- produce: sharedData.count = 1
--- produce: sharedData.count = 2
consume: sharedData.count = 1
consume: sharedData.count = 0
--- produce: sharedData.count = 1
--- produce: sharedData.count = 2
consume: sharedData.count = 1
consume: sharedData.count = 0
--- produce: sharedData.count = 1
--- produce: sharedData.count = 2
consume: sharedData.count = 1
consume: sharedData.count = 0

*/

屏障

屏障是用户协调多个线程并行工作的同步机制,屏障允许每个线程等待,直到所有的合作线程都到达某一点,然后从该点继续执行。pthread_join函数就是一种屏障,允许一个线程等待,直到另一个线程退出。屏障允许任意数量的线程等待,直到所有的线程完成处理工作,而线程不需要退出。所有线程达到屏障后可以接着工作。
我的理解就是设置一个计数,在任务执行完成的地方调用pthread_barrier_wait函数增加计数并进入睡眠阻塞状态,当计数达到当初设置的值时说明所有任务都执行完成,然后唤醒所有任务继续往下执行。示例:

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>

/* 屏障总数 */
#define BARRIER_NUM 4

/* 定义屏障 */
pthread_barrier_t barrier;

void err_exit(const char *err_msg)
{
    printf("error:%s\n", err_msg);
    exit(1);
}

void *thread_fun(void *arg)
{
    int num = (long int)arg;

    /* something work */
    printf("线程%d sleep %d s...\n", num, 4 - num);
    sleep(4 - num);

    printf("线程%d工作完成...\n", num);

    /* 等待屏障 */
    pthread_barrier_wait(&barrier);
    printf("线程%d返回...\n", num);

    return NULL;
}

int main(void)
{
    pthread_t tid_1, tid_2, tid_3;

    /* 初始化屏障 */
    pthread_barrier_init(&barrier, NULL, BARRIER_NUM);

    if (pthread_create(&tid_1, NULL, thread_fun, (void *)1) != 0)
        err_exit("create thread 1");

    if (pthread_create(&tid_2, NULL, thread_fun, (void *)2) != 0)
        err_exit("create thread 2");

    if (pthread_create(&tid_3, NULL, thread_fun, (void *)3) != 0)
        err_exit("create thread 3");

    /* 主线程等待工作完成 */
    pthread_barrier_wait(&barrier);
    printf("所有线程工作已完成...\n");
    /* 销毁屏障 */
    pthread_barrier_destroy(&barrier);
    sleep(1);

    return 0;
}

/* 输出结果:
线程3 sleep 1 s...
线程2 sleep 2 s...
线程1 sleep 3 s...
线程3工作完成...
线程2工作完成...
线程1工作完成...
所有线程工作已完成...
线程3返回...
线程2返回...
线程1返回...
*/

参考

本篇文章是读《Unix环境高级编程 第三版》中关于线程章节的读书笔记。


Comments

Content