I have tried solution described here but not working.
My Dataflow pipeline will read in messages and connect to a Postgre SQL to do some checking. So I created a psycopg2 threaded connection pool in Singleton pattern. Using singleton pattern is because with my auto-scale Dataflow, I want to keep total number of connections under control, so that each Dataflow thread won't create its own connection pool, and with total number of workers goes up, the total number of connections won't go explode.
I have constantly got this 'trying to put unkeyed connection' error when sending high volume of data to Dataflow.
By looking at the Psycopg2 source code, I understand that this error is returned when the key for a connection is not found in self._rused dictionary. Code as below:
key = self._rused.get(id(conn))
if key is None:
raise PoolError("trying to put unkeyed connection")
and the key was generated when calling _getconn method, Psycopg2 code as below:
def _getkey(self):
"""Return a new unique key."""
self._keys += 1
return self._keys
def _getconn(self, key=None):
"""Get a free connection and assign it to 'key' if not None."""
if self.closed:
raise PoolError("connection pool is closed")
if key is None:
key = self._getkey()
if key in self._used:
return self._used[key]
if self._pool:
self._used[key] = conn = self._pool.pop()
self._rused[id(conn)] = key
return conn
else:
if len(self._used) == self.maxconn:
raise PoolError("connection pool exhausted")
return self._connect(key)
My suspect is two threads call the _getkey method at the same time, and the same key get returned, and when the first thread finishes its process, it removes the key and connection from the self._rused dictionary, so when the second thread finishes its process and tries to find that same key and connection in the self._rused dictionary it fails, and thus the error.
But then I figured out that there is a lock in place when calling the _getconn method, so my previous suspect is invalid.
Now I am totally clueless, can someone give some thoughts on this? Thanks
you're seeing this error since you're passing None to the putconn() function. Source can be seen at: https://github.com/psycopg/psycopg2/blob/master/lib/pool.py