深入理解Python的TLS机制和Threading.local()

简介: 1.背景介绍我之前写过一个关于Python的TLS机制的浅浅析,大家可以参考这个文章,首先,我们再来熟悉熟悉什么是TLS机制。1.1 Thread Local Storage(线程局部存储)这个概念最早是相对于全局变量来说的,就是我们在编程的时候,会涉及到希望所有线程都能够共享访问同一个变量,在 Python/Go/C 中,我们就可以定义一个全局变量,这样Global Variable 对多个线程就是可见的,因为同一个进程所有线程共享地址空间,大家都可以操作。

1.背景介绍
我之前写过一个关于Python的TLS机制的浅浅析,大家可以参考这个文章,首先,我们再来熟悉熟悉什么是TLS机制。

1.1 Thread Local Storage(线程局部存储)
这个概念最早是相对于全局变量来说的,就是我们在编程的时候,会涉及到希望所有线程都能够共享访问同一个变量,在 Python/Go/C 中,我们就可以定义一个全局变量,这样Global Variable 对多个线程就是可见的,因为同一个进程所有线程共享地址空间,大家都可以操作。例如,一个全局的配置变量或单实例对象,所有线程就可以很方便访问了,但是仅仅这样有一个前提,就是这个变量的并发操作必须是幂等的,读写不影响我们程序的正确性。但是往往多线程共同操作一个全局变量,就会影响程序的正确性,因此我们必须枷锁,比如经典的并发加操作。

import threading
count = 0
lock = threading.RLock()
def inc():

global count, lock
with lock:
    count += 1

上面那个例子很多博客用来做ThreadLocal变量的讲解,实际上我觉得是有误导的,不恰当的。因为这种共享变量,你必须枷锁,因为他的目的就是为了大家一起去更新一个共享变量,多线程环境下必须枷锁。就算你使用ThreadLocal替换也没用,ThreadLocal能替换这个Count变量让所有线程单独存储一份么,不满足需求。你单独存一份,更改之后还得把结果再次写回到全局变量去更新,那写会的过程还是得枷锁。除非使用Golang中的单Channel更新机制,才能避免枷锁。

所以ThreadLocal变量使用强调的侧重点不在这里,更多的是在编程范式上面。其实就是有些时候,我们某个变量类型很多函数或者类都需要用,但是我又不想写死在代码里,每次传递参数都要传递这个类或者变量,因为一旦这个类发生类型上的变化,可能对于静态类型的语言,很多地方就得修改参数,而且这种变量一直在程序代码的参数传递中层层出现,你如果写过代码就会有感觉,有时候你设计的函数API好像一层层的得把一个参数传递进去,即使某些层好像用不到这个参数。

def getMysqlConn(passwd, db, host="localhost", port=3306, user="root", charset='utf8'):

conn = MySQLdb.connect(host=host, port=port, user=user, passwd=passwd, db=db, charset=charset)
return conn

def func1(zzz, passwd, db, host="localhost", port=3306, user="root", charset='utf8'):

conn = getMysqlConn(passwd, db, host, port, user, charset)
...

def func2(xxx,yyy,zzz, passwd, db, host="localhost", port=3306, user="root", charset='utf8'):

...
func1(zzz,passwd,db,host,port,user,charset)

上面的代码你可能会疯掉。那么你可能就考虑想把这个参数提出来,当成全局变量算了,哪一层用到了直接用就好了,不能让他无缘无故的不停的被当成局部变量传参。文章Alternatives to global variables and passing the same value over a long chain of calls描述了这个问题,但是这个时候出现的问题就是,可能其他代码线程会不可控的更改这个变量,导致你的程序发生未知错误。你把这种参数变成全局的暴露出来,那么基于的假设就是该参数不会被随意修改!一旦这个假设崩塌,你的程序可能会发生灾难后果。这不符合软件设计的开闭原则。所以我们使用TLS技术化解这种矛盾。

那么我们就设计了一种方案,就是有这样一种变量,他是全局的,但是每个线程在访问的时候都会存储一份成为自己的局部变量,修改就不会相互影响了。比如 Linux/Unix的 C 程序库 libc的全局变量errno, 这个其实就是TLS的例子。当系统调用从内核空间返回用户空间时,如果系统调用出错,那么便设置errno的值为一个负值,这样就不需要每次在函数内部定义局部变量。但是当多线程的概念和技术被提出后,这套机制就不再适用了,可以使用局部变量,但是不太可能去更改已有的代码了,比较好的解决方案是让每个线程都有自己的errno。实际上,现在的C库函数不是把出错代码写入全局量errno,而是通过一个函数__errno_location()获取一个地址,再把出错代码写入该地址,其意图就是让不同的线程使用不同的出错代码存储地点,而errno,现在一般已经变成了一个宏定义。每一个线程都会维护自己的一份,修改不影响其他线程。

这是不是意味着ThreadLocal对象不用枷锁了? 其实这个ThreadLocal和同步没有关系,他仅仅是提供了一种方便每个线程快速访问变量的方式,但是如果这个对象本身有些共享状态需要大家一起维护(比如Count++),你就必须枷锁,尽管每个线程操作的是ThreadLocal副本。维基百科上有以下原话:

A second use case would be multiple threads accumulating information into a global variable. To avoid a race condition, every access to this global variable would have to be protected by a mutex. Alternatively, each thread might accumulate into a thread-local variable (that, by definition, cannot be read from or written to from other threads, implying that there can be no race conditions). Threads then only have to synchronise a final accumulation from their own thread-local variable into a single, truly global variable.

比如我们写了一个共享的Manager类,这个类可能是用来做数据库连接,网络连接或者其他的做底层管理功能。我们有很多线程需要使用这个Manager的某些功能,并且这种类不是用来表示一种状态,供所有线程并发修改其状态并将最终修改的结果表现在该类上面(上面count的例子)。Manager只是可以提供给线程使用某些功能,然后每个线程可以把这个Manager复制一份成为自己的局部变量,自己可以随意修改,但是不会影响到其他线程,因为是复制的一份。但是如果你需要让管理器记录所有的连接操作次数,那么多线程对立面的某些变量访问比如Count就需要枷锁了。

2.TLS 在Python中的运用和实现
2.1 简单使用
ThreadLocal不仅仅可以解决全局变量访问冲突,其实还有其他好处,在PEP266中有提到,ThreadLocal变量是可以减少指令加速运算的,因为全局变量往往需要更多的指令(需要for loop)来做查询访问,而ThreadLocal 之后,有了索引表,直接可以一条指令找到这个对象。

import threading

userName = threading.local()

def SessionThread(userName_in):

userName.val = userName_in
print(userName.val)   

Session1 = threading.Thread(target=SessionThread("User1"))
Session2 = threading.Thread(target=SessionThread("User2"))

start the session threads

Session1.start()
Session2.start()

wait till the session threads are complete

Session1.join()
Session2.join()
上述Threadlocal的实现原理类似有一个全局的词典,词典的key是线程id,value就是共享的全局变量的副本。每次访问全局变量的时候,你访问到的其实是副本,只是Python使用黑魔法帮我们屏蔽了这个userName.val 的访问细节,其实他访问的是词典中的对应线程所拥有的对象副本。

2.2 实现源码分析
all = ["local"]
class _localbase(object):

__slots__ = '_local__key', '_local__args', '_local__lock'

def __new__(cls, *args, **kw):
    # 新建一个类对象
    self = object.__new__(cls)
    # 在主线程中初始化这个这个全局对象的某些属性,比如 `_local__key`, 这个key以后会用作其他线程使用全局变量副本的查询依据,以后每个线程都会根据这个key来查找自己的局部副本数据
    key = '_local__key', 'thread.local.' + str(id(self))
    object.__setattr__(self, '_local__key', key)
    object.__setattr__(self, '_local__args', (args, kw))
    # 多线程会并发设置全局变量的属性,这时候会并发访问设置属性,因此需要一把全局锁,进行互斥操作
    object.__setattr__(self, '_local__lock', RLock())

    if (args or kw) and (cls.__init__ is object.__init__):
        raise TypeError("Initialization arguments are not supported")

    # We need to create the thread dict in anticipation of
    # __init__ being called, to make sure we don't call it
    # again ourselves.
    dict = object.__getattribute__(self, '__dict__')
    current_thread().__dict__[key] = dict

    return self

def _patch(self):

# 拿到全局的key
key = object.__getattribute__(self, '_local__key')
# 在当前线程中根据key找到线程的私有数据副本,并替换掉 ThreadLocal自己的__dict__属性。如果没有,就创建一个,并添加
d = current_thread().__dict__.get(key)
if d is None:
    d = {}
    # 线程还没得私有数据副本,创建一个并加入线程自己的属性中
    current_thread().__dict__[key] = d
    # 替换ThreadLocal的__dict__为当前线程的私有数据词典d
    object.__setattr__(self, '__dict__', d)

    # we have a new instance dict, so call out __init__ if we have
    # one
    # 这段的意思其实是,如果原来的全局变量ThreadLocal 本身有一些其他的属性和数据,那么直接替换掉一个新dict之后,以前的数据就丢失了,这里我们必须初始化以前的数据到新dict中
    cls = type(self)
    if cls.__init__ is not object.__init__:
        args, kw = object.__getattribute__(self, '_local__args')
        cls.__init__(self, *args, **kw)
else:
    object.__setattr__(self, '__dict__', d)

class local(_localbase):

def __getattribute__(self, name):
    lock = object.__getattribute__(self, '_local__lock')
    lock.acquire()
    try:
        _patch(self)
        return object.__getattribute__(self, name)
    finally:
        lock.release()

def __setattr__(self, name, value):
    if name == '__dict__':
        raise AttributeError(
            "%r object attribute '__dict__' is read-only"
            % self.__class__.__name__)
    # 拿到早已经在主线程设置的共享的一把锁
    lock = object.__getattribute__(self, '_local__lock')
    lock.acquire()
    try:
        _patch(self)# 关键代码,这个patch会导致 Threadlocal 这个数据的__dict__直接被换成了所在线程自己的私有数据, Python 里面有很多这种patch的替换手段,就是直接把基础库的某些功能和函数直接替换成了第三方库的比如monkey patch
        # 再次设置属性的时候,设置的__dict__ 其实不是 Threadlocal 自己的属性了,是而是当前所在线程的__dict__的某一个key-value 副本数据的value,这个value是一个dict
        # object 的setattr默认行为其实就是在自己的__dict__对象中添加一对key-pair,但是现在他的__dict__已经更换成所在线程的一个数据副本词典了,黑魔法替换就在这里
        return object.__setattr__(self, name, value)
    finally:
        lock.release()

def __delattr__(self, name):
    if name == '__dict__':
        raise AttributeError(
            "%r object attribute '__dict__' is read-only"
            % self.__class__.__name__)
    lock = object.__getattribute__(self, '_local__lock')
    lock.acquire()
    try:
        _patch(self)
        return object.__delattr__(self, name)
    finally:
        lock.release()

def __del__(self):
    import threading

    key = object.__getattribute__(self, '_local__key')

    try:
        # We use the non-locking API since we might already hold the lock
        # (__del__ can be called at any point by the cyclic GC).
        threads = threading._enumerate()
    except:
        # If enumerating the current threads fails, as it seems to do
        # during shutdown, we'll skip cleanup under the assumption
        # that there is nothing to clean up.
        return

    for thread in threads:
        try:
            __dict__ = thread.__dict__
        except AttributeError:
            # Thread is dying, rest in peace.
            continue

        if key in __dict__:
            try:
                del __dict__[key]
            except KeyError:
                pass # didn't have anything in this thread

from threading import current_thread, RLock

data = local()
print (data.__dict__)
def t(x):

global data
data.x = x
data.y = 1
print (current_thread().__dict__)
print (data.__dict__)

t1 = threading.Thread(target=t, args = (777,))
t2 = threading.Thread(target=t, args = (888,))
print current_thread().__dict__
t1.start()
t2.start()
t1.join()
t2.join()
print(data.__dict__)
关键技术就在patch上面,Python 里面有很多这种patch的替换手段,就是直接把基础库的某些功能和函数直接替换成了第三方库的比如monkey patch. 再次设置属性的时候,设置的 dict 其实不是ThreadLocal自己的,是而是当前所在线程的__dict__ 的某一个key-value 副本数据,key 就是线程访问的某个TLS变量生成的(一个线程可以有很多TLS变量,每个有不同的key),value是一个dict. object的 setattr默认行为其实就是在自己的__dict__对象中添加一对key-pair,但是现在他的__dict__已经更换成所在线程的一个数据副本词典dict了,黑魔法替换就在这里.

下面的例子展示了Python黑魔法的一个替换词典的方式,可以运行看看

class A:

def substitute(self, d):
    object.__setattr__(self, '__dict__', d)

a = A()
a.y = 3
old_dict = a.__dict__
print(old_dict)
d = {'x':1}
a.substitute(d)
print(a.__dict__)
a.y = 777
print(a.__dict__)
print(d)

OUTPUT

{'y': 3}
{'x': 1}
{'x': 1, 'y': 777}
{'x': 1, 'y': 777}
如果A本身已经含有一些数据,那就不能简单的直接复制了,还需要初始化以前的数据填充新的词典,这也是在源码中看到的。

from threading import current_thread
class A:

def __new__(cls, *args, **kw):
    self = object.__new__(cls)
    setattr(cls, '_local__args', (args, kw))
    return self

def __init__(self, *args, **kw):
    self.shared_x = kw["shared_x"]
    self.shared_y = kw["shared_y"]
def substitute(self, d):
    object.__setattr__(self, '__dict__', d)
    cls = type(self)
    if cls.__init__ is not object.__init__:
        print("7---------------")
        args, kw = getattr(self, '_local__args')
        cls.__init__(self, *args, **kw)

a = A(shared_x=111, shared_y=222)
a.y = 3
old_dict = a.__dict__
print(old_dict)
d = {'x':1}
a.substitute(d)
print(a.__dict__)
a.y = 777
print(a.__dict__)
print(d)
print(old_dict)
下图就是访问每个线程访问过程,实际上操作的是线程自己的私有数据副本。同时需要注意的还是那句话,使用 ThreadLocal对象不意味着你的程序不需要再枷锁,比如这个 ThreadLocal 对象可能又引用了其他共享状态的对象,那么就要对这个共享状态对象的操作进行枷锁实现同步和互斥。

ThreadLocal 实现过程
3 TLS 在Java 中的运用和实现
3.1 简单使用
public class ThreadLocalExample {

public static class MyRunnable implements Runnable {

    private ThreadLocal threadLocal = new ThreadLocal();

    @Override
    public void run() {
        threadLocal.set((int) (Math.random() * 100D));
        try {
        Thread.sleep(2000);
        } catch (InterruptedException e) {

        }
        System.out.println(threadLocal.get());
    }
}

public static void main(String[] args) {
     MyRunnable sharedRunnableInstance = new MyRunnable();
     Thread thread1 = new Thread(sharedRunnableInstance);
     Thread thread2 = new Thread(sharedRunnableInstance);
     thread1.start();
     thread2.start();
}

}
3.2 源码实现
有了Python版本的分析,Java版本就不再多做解释,感兴趣的可以看看源码,实现原理肯定都是大同小异,只是语言上的差异,导致 Java 不可能像Python这种动态类型语言一样灵活。

需要每个线程都维护一个 key-value 集合数据结构,记录每个线程访问到的 TLS 变量副本,这样每个线程可以根据 key 来找到相应的 TLS副本数据,对副本数据进行真实的操作,而不是TLS全局变量或者静态类(Java中)。在Python中直接很简单的使用了动态数据绑定的词典数据结构,在Java中稍显麻烦,需要实现一个类似Map的结构,ThreadLocal.get() 方法其实本质上也是和Python中一样,先获取当前线程自己的ThreadLocalMap对象(就是每个线程维护的TLS key-value集合啦)。再从ThreadLocalMap对象中找出当前的ThreadLocal变量副本,和HashMap一样的采用了链地址法的hash结构。可以参考文章Java 多线程(7): ThreadLocal 的应用及原理。Java 里一般是采用泛型规定你共享的变量类型,然后每个线程维护该变量的副本。

  1. 小结
    TLS技术的使用和属性:

解决多线程编程中的对同一变量的访问冲突的一种技术,TLS会为每一个线程维护一个和该线程绑定的变量的副本。而不是无止尽的传递局部参数的方式编程。
每一个线程都拥有自己的变量副本,并不意味着就一定不会对TLS变量中某些操作枷锁了。
Java平台的java.lang.ThreadLocal和Python 中的threading.local()都是TLS技术的一种实现,。
TLS使用的缺陷是,如果你的线程都不退出,那么副本数据可能一直不被GC回收,会消耗很多资源,比如线程池中,线程都不退出,使用TLS需要非常小心。
TLS技术的实现原理:
需要每个线程都维护一个 key-value集合数据结构,记录每个线程访问到的 TLS变量副本,这样每个线程可以根据 key来找到相应的 TLS副本数据,对副本数据进行真实的操作,而不是TLS全局变量或者静态类(Java中).

TLS变量自己会根据当前调用他的Thread对象,根据Thread对象得到该线程维护的 TLS 副本集合,然后进一步根据当前TLS的key,查到到key对一个的TLS副本数据。这样就给每个线程造成一种假象,以为大家可以同时更新一个全局共享变量或者静态类对象。

相关文章
|
3月前
|
测试技术 API 数据库
Python反射机制在实际场景中的应用
Python 的反射机制是指在运行时动态地访问、检测和修改类和对象的属性和方法。:通过反射机制,可以动态加载和执行插件,无需在代码中硬编码每个插件的具体实现。这样可以实现插件化架构,使系统更加灵活和可扩展。:可以使用反射机制来读取和解析配置文件中的配置项,并动态地应用到程序中。这样可以实现灵活的配置管理,方便根据需要进行配置项的修改和扩展。:在自动化测试框架中,可以利用反射机制动态地加载和执行测试用例,从而实现自动化测试的灵活性和扩展性。
38 2
|
9天前
|
消息中间件 安全 Kafka
Python IPC机制全攻略:让进程间通信变得像呼吸一样自然
【9月更文挑战第12天】在编程领域,进程间通信(IPC)是连接独立执行单元的关键技术。Python凭借简洁的语法和丰富的库支持,提供了多种IPC方案。本文将对比探讨Python的IPC机制,包括管道与消息队列、套接字与共享内存。管道适用于简单场景,而消息队列更灵活,适合高并发环境。套接字广泛用于网络通信,共享内存则在本地高效传输数据。通过示例代码展示`multiprocessing.Queue`的使用,帮助读者理解IPC的实际应用。希望本文能让你更熟练地选择和运用IPC机制。
32 10
|
11天前
|
消息中间件 安全 数据库
动手实操!Python IPC机制,打造高效协同的进程军团
【9月更文挑战第10天】在软件开发领域,进程间的高效协作对应用性能与稳定性至关重要。Python提供了多种进程间通信(IPC)机制,如管道、消息队列、套接字、共享内存等,帮助开发者构建高效协同的系统。本文将通过动手实践,使用`multiprocessing`模块演示如何利用队列实现进程间通信。示例代码展示了如何创建一个工作进程从队列接收并处理数据,从而实现安全高效的进程交互。通过实际操作,读者可以深入了解Python IPC的强大功能,提升系统的并发处理能力。
23 0
|
1月前
|
开发者 Python
Python中的异常处理机制及其实践
【8月更文挑战第12天】Python的异常处理机制通过`try`和`except`结构显著提高了程序的稳定性和可靠性。在`try`块中执行可能引发异常的代码,如果发生异常,控制权将转移到与该异常类型匹配的`except`块。此外,还可以通过`else`处理无异常的情况,以及使用`finally`确保某些代码无论如何都会被执行,非常适合进行清理工作。这种机制允许开发者精确地捕捉和管理异常,从而提升程序的健壮性和可维护性。同时,Python还支持定义自定义异常,进一步增强了错误处理的灵活性。
39 4
|
1月前
|
监控 测试技术 数据库
Python自动化测试之异常处理机制
总体而言,妥善设计的异常处理策略让自动化测试更加稳定和可靠,同时也使得测试结果更加清晰、易于理解和维护。在设计自动化测试脚本时,务必考虑到异常处理机制的实现,以保证测试过程中遇到意外情况时的鲁棒性和信息的有效传达。
37 2
|
1月前
|
消息中间件 安全 Kafka
Python IPC机制全攻略:让进程间通信变得像呼吸一样自然
【8月更文挑战第2天】在编程领域中,进程间通信(IPC)作为连接独立运行单元的关键桥梁,其重要性不言而喻。本文以Python为例,深入探讨了IPC的各种机制。首先对比了管道与消息队列:管道作为一种基础IPC机制,适用于简单场景;而消息队列通过第三方库如RabbitMQ或Kafka支持更复杂的多生产者多消费者模型,具备高并发处理能力。
33 1
|
2月前
|
JavaScript 前端开发 网络协议
从理论到实践:全面剖析Python Web应用中的WebSocket实时通信机制
【7月更文挑战第17天】WebSocket在实时Web应用中扮演重要角色,提供全双工通信,减少延迟。本文详述了Python中使用`websockets`库创建服务器的步骤,展示了一个简单的echo服务器示例,监听8765端口,接收并回显客户端消息。客户端通过JavaScript与服务器交互,实现双向通信。了解WebSocket的握手、传输和关闭阶段,有助于开发者有效利用WebSocket提升应用性能。随着实时需求增长,掌握WebSocket技术至关重要。
210 6
|
2月前
|
程序员 开发者 Python
Python动态属性与反射机制方式
通过反射和动态属性,Python程序员获得了巨大的权能,能在运行时访问、修改或为对象新增属性和方法,显著提高编程的智能化和适应性。内置的反射机制可能使开发者跨越编写代码时的限制,通过名称访问对象的特性、方法以及其他成员,为创建一个具有高度配置性、扩展性强大的应用程序打下基础。此外,利用getattr和setattr函数来获取和设定对象的属性,或是利用hasattr确认其是否存在某属性,甚至可以通过名字来动态地执行对象的函数。 总之,反射和动态属性对于Python的程序开发而言是重要的工具,它们不仅提供了编写效率高且灵活的代码的能力,还为构建可高度定制和扩展的应用程序提供了可能。对于熟练掌握这些
|
2月前
|
Linux 网络安全 开发者
【Python】已解决:WARNING: pip is configured with locations that require TLS/SSL, however the ssl module i
【Python】已解决:WARNING: pip is configured with locations that require TLS/SSL, however the ssl module i
234 3
|
1月前
|
消息中间件 安全 数据库
动手实操!Python IPC机制,打造高效协同的进程军团
【8月更文挑战第2天】在 software development 领域, Python 的进程间通信 (IPC) 能力对应用性能与稳定性至关重要。Python 提供了多样化的 IPC 机制, 如管道、消息队列、套接字、共享内存等, 每种都有独特优势。本文以动手实践为主, 使用 `multiprocessing` 模块演示 IPC 的实现。示例代码展示了如何利用 `Queue` 在进程间安全高效地传输数据。
37 0