zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

Postgresql源码(83)执行器的结果接收系统——DestReceiver

postgresql源码系统 结果 接收 执行器 83
2023-06-13 09:13:36 时间

相关 《Postgresql源码(76)执行器专用元组格式TupleTableSlot》 《Postgresql源码(82)SPI模块拆解分析一:执行简单SQL获取结果》 《Postgresql源码(83)执行器的结果接收系统——DestReceiver》

0 总结

  • 执行器进入前会配置_DestReceiver(一套接口)
  • 执行器在内部跑一遍计划,产生一条tts(参考《Postgresql源码(76)执行器专用元组格式TupleTableSlot》),执行器调用接口函数receiveSlot,按当前接口定义输出到指定位置。
  • 例如
    • 【SELECT xxx】receiveSlot为printtup:把tts按列解析后拿到数据,按客户端服务端协议拼装包内容,调用libpq返回给客户端。
    • 【COPY TO 文件】receiveSlot为copy_dest_receive:把tts按列解析后拿到数据,按copy语法提供的分隔符组装,fwrite到文件中。
    • 【SPI】中receiveSlot为spi_printtup:把tts转换为HeapTuple格式,保存到SPI结果全局变量中。

1 概要

执行器的工作包括:work、get result,之前work的内容已经介绍过了,这里分析下执行器如何拿到执行结果。

  • 执行器会在多种场景下工作,例如:
    • SPI调用。
    • 常规客户端服务端的调用。
    • standalone backend调用(没有postmaster)。
    • 系统内部调用。
  • 对于上述场景,执行器的调用者有较大的差异,结果集无法使用一套函数返回。所以执行器设计了一套拿结果的函数钩子(接口),调用者需要将结果集的获取函数配置到接口上,执行器在执行中会把结果通过接口函数调入相应模块中,完成调用者所需的结果集构造,例如:
    • SPI的结果需要存放到执行的全局变量结构中。
    • 常规客户端服务端调用需要将结果用Libpq返回客户端。
    • standalone backend调用需要将结果打印到stdout。
    • 系统内部调用不需要返回结果。

PG的结果接收器提供了四个接口:

  • receiveSlot:输入执行器产生的tts,按指定格式输出
  • rStartup:初始化结果接收器
  • rShutdown:停止结果接收器
  • rDestroy:清理动态申请中间变量
struct _DestReceiver
{
	/* Called for each tuple to be output: */
	bool		(*receiveSlot) (TupleTableSlot *slot,
								DestReceiver *self);
	/* Per-executor-run initialization and shutdown: */
	void		(*rStartup) (DestReceiver *self,
							 int operation,
							 TupleDesc typeinfo);
	void		(*rShutdown) (DestReceiver *self);
	/* Destroy the receiver object itself (if dynamically allocated) */
	void		(*rDestroy) (DestReceiver *self);
	/* CommandDest code for this receiver */
	CommandDest mydest;
	/* Private fields might appear beyond this point... */
};

2 场景

第一组:正常客户端连接场景【DestRemote】

这一组函数接口由printtup_create_DR配置:

DestReceiver *
printtup_create_DR(CommandDest dest)
{
	DR_printtup *self = (DR_printtup *) palloc0(sizeof(DR_printtup));

	self->pub.receiveSlot = printtup;	/* might get changed later */
	self->pub.rStartup = printtup_startup;
	self->pub.rShutdown = printtup_shutdown;
	self->pub.rDestroy = printtup_destroy;
	self->pub.mydest = dest;

	/*
	 * Send T message automatically if DestRemote, but not if
	 * DestRemoteExecute
	 */
	self->sendDescrip = (dest == DestRemote);

	self->attrinfo = NULL;
	self->nattrs = 0;
	self->myinfo = NULL;
	self->buf.data = NULL;
	self->tmpcontext = NULL;

	return (DestReceiver *) self;
}

注意:这里的数据结构DR_printtup把DestReceiver有包装了 一层:

typedef struct
{
	DestReceiver pub;			/* publicly-known function pointers */
	Portal		portal;			/* the Portal we are printing from */
	bool		sendDescrip;	/* send RowDescription at startup? */
	TupleDesc	attrinfo;		/* The attr info we are set up for */
	int			nattrs;
	PrinttupAttrInfo *myinfo;	/* Cached info about each attr */
	StringInfoData buf;			/* output buffer (*not* in tmpcontext) */
	MemoryContext tmpcontext;	/* Memory context for per-row workspace */
} DR_printtup;

来看下这几个函数的工作位置和流程,例如:

select s::int, left(random()::text,4) l from generate_series(1,2) s;

-- 输出:两行、两列
 s |  l   
---+------
 1 | 0.55
 2 | 0.28

1 rStartup = printtup_startup:申请上下文,发送行描述符

位置

#0  printtup_startup (self=0x106dd50, operation=1, typeinfo=0x100d190) at printtup.c:113
#1  0x0000000000733ddf in standard_ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:350
#2  0x0000000000733ca8 in ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#3  0x000000000097ccd5 in PortalRunSelect (portal=0xff5050, forward=true, count=0, dest=0x106dd50) at pquery.c:921
#4  0x000000000097c994 in PortalRun (portal=0xff5050, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x106dd50, altdest=0x106dd50, qc=0x7ffd6b492890) at pquery.c:765
#5  0x000000000097663b in exec_simple_query (query_string=0xf5a4f0 "select s::int, left(random()::text,4) l from generate_series(1,2) s\n;") at postgres.c:1213
#6  0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#7  0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#8  0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#9  0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#10 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#11 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209

流程

static void
printtup_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
{
	DR_printtup *myState = (DR_printtup *) self;
	Portal		portal = myState->portal;

	/*
	 * Create I/O buffer to be used for all messages.  This cannot be inside
	 * tmpcontext, since we want to re-use it across rows.
	 */
	initStringInfo(&myState->buf);

申请"printtup"上下文,存放所有输出数据,

	myState->tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
												"printtup",
												ALLOCSET_DEFAULT_SIZES);

启动时就需要发送行描述符:

	if (myState->sendDescrip)
		SendRowDescriptionMessage(&myState->buf,
								  typeinfo,
								  FetchPortalTargetList(portal),
								  portal->formats);
}

SendRowDescriptionMessage发送行描述符,入参:

List *targetlist
  TargetEntry: 
    {xpr = {type = T_TargetEntry}, 
     expr = 0x1086570, 
     resno = 1, 
     resname = 0xf5af88 "s", 
     ressortgroupref = 0, 
     resorigtbl = 0, 
     resorigcol = 0, 
     resjunk = false}
     
  TargetEntry: 
    {xpr = {type = T_TargetEntry}, 
     expr = 0x1086868, 
     resno = 2, 
     resname = 0xf5b5a0 "l", 
     ressortgroupref = 0, 
     resorigtbl = 0, 
     resorigcol = 0, 
     resjunk = false}

typeinfo
  {natts = 2, tdtypeid = 2249, tdtypmod = -1, tdrefcount = -1, constr = 0x0, attrs = 0x100d1a8}

流程:拼接输出串

void
SendRowDescriptionMessage(StringInfo buf, TupleDesc typeinfo,
						  List *targetlist, int16 *formats)
{
	int			natts = typeinfo->natts;
	int			i;
	ListCell   *tlist_item = list_head(targetlist);

	/* tuple descriptor message type */
	pq_beginmessage_reuse(buf, 'T');
	/* # of attrs in tuples */
	pq_sendint16(buf, natts);

	/*
	 * Preallocate memory for the entire message to be sent. That allows to
	 * use the significantly faster inline pqformat.h functions and to avoid
	 * reallocations.
	 *
	 * Have to overestimate the size of the column-names, to account for
	 * character set overhead.
	 */
	enlargeStringInfo(buf, (NAMEDATALEN * MAX_CONVERSION_GROWTH /* attname */
							+ sizeof(Oid)	/* resorigtbl */
							+ sizeof(AttrNumber)	/* resorigcol */
							+ sizeof(Oid)	/* atttypid */
							+ sizeof(int16) /* attlen */
							+ sizeof(int32) /* attypmod */
							+ sizeof(int16) /* format */
							) * natts);

	for (i = 0; i < natts; ++i)
	{
		Form_pg_attribute att = TupleDescAttr(typeinfo, i);
		Oid			atttypid = att->atttypid;
		int32		atttypmod = att->atttypmod;
		Oid			resorigtbl;
		AttrNumber	resorigcol;
		int16		format;

		/*
		 * If column is a domain, send the base type and typmod instead.
		 * Lookup before sending any ints, for efficiency.
		 */
		atttypid = getBaseTypeAndTypmod(atttypid, &atttypmod);

		/* Do we have a non-resjunk tlist item? */
		while (tlist_item &&
			   ((TargetEntry *) lfirst(tlist_item))->resjunk)
			tlist_item = lnext(targetlist, tlist_item);
		if (tlist_item)
		{
			TargetEntry *tle = (TargetEntry *) lfirst(tlist_item);

			resorigtbl = tle->resorigtbl;
			resorigcol = tle->resorigcol;
			tlist_item = lnext(targetlist, tlist_item);
		}
		else
		{
			/* No info available, so send zeroes */
			resorigtbl = 0;
			resorigcol = 0;
		}

		if (formats)
			format = formats[i];
		else
			format = 0;

		pq_writestring(buf, NameStr(att->attname));
		pq_writeint32(buf, resorigtbl);
		pq_writeint16(buf, resorigcol);
		pq_writeint32(buf, atttypid);
		pq_writeint16(buf, att->attlen);
		pq_writeint32(buf, atttypmod);
		pq_writeint16(buf, format);
	}

	pq_endmessage_reuse(buf);
}

pq_endmessage_reuse

socket_putmessage(char msgtype, const char *s, size_t len)

(gdb) p s
$18 = 0x10724e0 ""
(gdb) p len
$19 = 42
(gdb) x/32bx 0x10724e0
0x10724e0:      0x00    0x02    0x73(s) 0x00    0x00    0x00    0x00    0x00
0x10724e8:      0x00    0x00    0x00    0x00    0x00    0x17    0x00    0x04
0x10724f0:      0xff    0xff    0xff    0xff    0x00    0x00    0x6c(l) 0x00
0x10724f8:      0x00    0x00    0x00    0x00    0x00    0x00    0x00    0x00
0x1072500:      0x00    0x19    0xff    0xff    0xff    0xff    0xff    0xff
0x1072508:      0x00    0x00

2 receiveSlot = printtup

位置

#0  printtup (slot=0x100d2a8, self=0x106dd50) at printtup.c:303
#1  0x0000000000736102 in ExecutePlan (estate=0x100b630, planstate=0x100b868, use_parallel_mode=false, operation=CMD_SELECT, sendTuples=true, numberTuples=0,direction=ForwardScanDirection, dest=0x106dd50, execute_once=true) at execMain.c:1582
#2  0x0000000000733e7d in standard_ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:361
#3  0x0000000000733ca8 in ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#4  0x000000000097ccd5 in PortalRunSelect (portal=0xff5050, forward=true, count=0, dest=0x106dd50) at pquery.c:921
#5  0x000000000097c994 in PortalRun (portal=0xff5050, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x106dd50, altdest=0x106dd50,qc=0x7ffd6b492890) at pquery.c:765
#6  0x000000000097663b in exec_simple_query (query_string=0xf5a4f0 "select s::int, left(random()::text,4) l from generate_series(1,2) s ;") at postgres.c:1213
#7  0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#8  0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#9  0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#10 0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#11 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#12 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209

流程

printtup输入为tts包装的元组,和上面初始化后的DestReceiver。

static bool
printtup(TupleTableSlot *slot, DestReceiver *self)
  printtup_prepare_info    // 拼接DR_printtup中的信息,准备发送
  MemoryContextSwitchTo    // 切换到"printtup"
  pq_beginmessage_reuse    // 调用libpq开始发数据
  pq_sendint16
  ...
  ...
  pq_endmessage_reuse

3 rShutdown = printtup_shutdown

位置

#0  printtup_shutdown (self=0x106dd50) at printtup.c:388
#1  0x0000000000733e98 in standard_ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:376
#2  0x0000000000733ca8 in ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#3  0x000000000097ccd5 in PortalRunSelect (portal=0xff5050, forward=true, count=0, dest=0x106dd50) at pquery.c:921
#4  0x000000000097c994 in PortalRun (portal=0xff5050, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x106dd50, altdest=0x106dd50, qc=0x7ffd6b492890) at pquery.c:765
#5  0x000000000097663b in exec_simple_query (query_string=0xf5a4f0 "select s::int, left(random()::text,4) l from generate_series(1,2) s ;") at postgres.c:1213
#6  0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#7  0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#8  0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#9  0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#10 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#11 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209

流程:清理工作

static void
printtup_shutdown(DestReceiver *self)
{
	DR_printtup *myState = (DR_printtup *) self;

	if (myState->myinfo)
		pfree(myState->myinfo);
	myState->myinfo = NULL;

	myState->attrinfo = NULL;

	if (myState->buf.data)
		pfree(myState->buf.data);
	myState->buf.data = NULL;

	if (myState->tmpcontext)
		MemoryContextDelete(myState->tmpcontext);
	myState->tmpcontext = NULL;
}

4 rDestroy = printtup_destroy

位置

#0  printtup_destroy (self=0x106dd50) at printtup.c:412
#1  0x0000000000976650 in exec_simple_query (query_string=0xf5a4f0 "select s::int, left(random()::text,4) l from generate_series(1,2) s ;") at postgres.c:1221
#2  0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#3  0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#4  0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#5  0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#6  0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#7  0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209

流程:清理动态申请的:外层数据结构DR_printtup

static void
printtup_destroy(DestReceiver *self)
{
	pfree(self);
}

第二组:COPY数据场景【DestCopyOut】

这一组函数接口由CreateCopyDestReceiver配置:

DestReceiver *
CreateCopyDestReceiver(void)
{
	DR_copy    *self = (DR_copy *) palloc(sizeof(DR_copy));

	self->pub.receiveSlot = copy_dest_receive;
	self->pub.rStartup = copy_dest_startup;
	self->pub.rShutdown = copy_dest_shutdown;
	self->pub.rDestroy = copy_dest_destroy;
	self->pub.mydest = DestCopyOut;

	self->cstate = NULL;		/* will be set later */
	self->processed = 0;

	return (DestReceiver *) self;
}

注意copy也给DestReceiver包了一层:DR_copy

typedef struct
{
	DestReceiver pub;			/* publicly-known function pointers */
	CopyToState cstate;			/* CopyToStateData for the command */
	uint64		processed;		/* # of tuples processed */
} DR_copy;

来看下这几个函数的工作位置和流程,例如:

copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';

-- 输出:两行两列
1       0.74
2       0.09

1 rStartup = copy_dest_startup

2 receiveSlot = copy_dest_receive

位置

#0  copy_dest_receive (slot=0x1048538, self=0x10861c0) at copyto.c:1259
#1  0x0000000000736102 in ExecutePlan (estate=0x10468c0, planstate=0x1046af8, use_parallel_mode=false, operation=CMD_SELECT, sendTuples=true, numberTuples=0,  direction=ForwardScanDirection, dest=0x10861c0, execute_once=true) at execMain.c:1582
#2  0x0000000000733e7d in standard_ExecutorRun (queryDesc=0x1086218, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:361
#3  0x0000000000733ca8 in ExecutorRun (queryDesc=0x1086218, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#4  0x0000000000685e87 in DoCopyTo (cstate=0xf7da60) at copyto.c:905
#5  0x000000000067bdfa in DoCopy (pstate=0xf7d910, stmt=0xf5bb88, stmt_location=0, stmt_len=86, processed=0x7ffd6b4924e8) at copy.c:309
#6  0x000000000097ec16 in standard_ProcessUtility (pstmt=0xf5bef8,  queryString=0xf5a4f0 "copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';", readOnlyTree=false, context=PROCESS_UTILITY_TOPLEVEL, params=0x0, queryEnv=0x0, dest=0xf5bfe8, qc=0x7ffd6b492890) at utility.c:739
#7  0x000000000097e69b in ProcessUtility (pstmt=0xf5bef8, queryString=0xf5a4f0 "copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';", readOnlyTree=false, context=PROCESS_UTILITY_TOPLEVEL, params=0x0, queryEnv=0x0, dest=0xf5bfe8, qc=0x7ffd6b492890) at utility.c:527
#8  0x000000000097d297 in PortalRunUtility (portal=0xff5050, pstmt=0xf5bef8, isTopLevel=true, setHoldSnapshot=false, dest=0xf5bfe8, qc=0x7ffd6b492890)at pquery.c:1155
#9  0x000000000097d4fb in PortalRunMulti (portal=0xff5050, isTopLevel=true, setHoldSnapshot=false, dest=0xf5bfe8, altdest=0xf5bfe8, qc=0x7ffd6b492890) at pquery.c:1312
#10 0x000000000097ca27 in PortalRun (portal=0xff5050, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0xf5bfe8, altdest=0xf5bfe8, qc=0x7ffd6b492890) at pquery.c:788
#11 0x000000000097663b in exec_simple_query (query_string=0xf5a4f0 "copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';")at postgres.c:1213
#12 0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#13 0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#14 0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#15 0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#16 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#17 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209

流程

  • 同样也是拿到tts和DestReceiver
  • 由CopyOneRowTo调用CopySendEndOfRow调用fwrite写入文件
static bool
copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
{
	DR_copy    *myState = (DR_copy *) self;
	CopyToState cstate = myState->cstate;

	/* Send the data */
	CopyOneRowTo(cstate, slot);

	/* Increment the number of processed tuples, and report the progress */
	pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
								 ++myState->processed);

	return true;
}

3 rShutdown = copy_dest_shutdown

位置

#0  copy_dest_shutdown (self=0x10861c0) at copyto.c:1279
#1  0x0000000000733e98 in standard_ExecutorRun (queryDesc=0x1086218, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:376
#2  0x0000000000733ca8 in ExecutorRun (queryDesc=0x1086218, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#3  0x0000000000685e87 in DoCopyTo (cstate=0xf7da60) at copyto.c:905
#4  0x000000000067bdfa in DoCopy (pstate=0xf7d910, stmt=0xf5bb88, stmt_location=0, stmt_len=86, processed=0x7ffd6b4924e8) at copy.c:309
#5  0x000000000097ec16 in standard_ProcessUtility (pstmt=0xf5bef8,  queryString=0xf5a4f0 "copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';", readOnlyTree=false,  context=PROCESS_UTILITY_TOPLEVEL, params=0x0, queryEnv=0x0, dest=0xf5bfe8, qc=0x7ffd6b492890) at utility.c:739

无操作

4 rDestroy = copy_dest_destroy

不执行,因为DestReceiver外面包的数据结构DR_copy没有什么需要释放的。

第三组:SPI获取数据场景【DestSPI】

这一组函数接口由CreateDestReceiver分发函数直接配置,注意前面两种都是走CreateDestReceiver入口进入自己的配置函数,但是SPI不同,直接在CreateDestReceiver里面配置:

DestReceiver *
CreateDestReceiver(CommandDest dest)
{
	/*
	 * It's ok to cast the constness away as any modification of the none
	 * receiver would be a bug (which gets easier to catch this way).
	 */

	switch (dest)
	{
		case DestRemote:
		case DestRemoteExecute:
			return printtup_create_DR(dest);

		case DestRemoteSimple:
			return unconstify(DestReceiver *, &printsimpleDR);

		case DestNone:
			return unconstify(DestReceiver *, &donothingDR);

		case DestDebug:
			return unconstify(DestReceiver *, &debugtupDR);

        // 这里配置 <-<-<-----------------------------
		case DestSPI:
			return unconstify(DestReceiver *, &spi_printtupDR);

		case DestTuplestore:
			return CreateTuplestoreDestReceiver();

		case DestIntoRel:
			return CreateIntoRelDestReceiver(NULL);

		case DestCopyOut:
			return CreateCopyDestReceiver();

		case DestSQLFunction:
			return CreateSQLFunctionDestReceiver();

		case DestTransientRel:
			return CreateTransientRelDestReceiver(InvalidOid);

		case DestTupleQueue:
			return CreateTupleQueueDestReceiver(NULL);
	}

	/* should never get here */
	pg_unreachable();
}

spi_printtupDR带四个函数:

static const DestReceiver spi_printtupDR = {
	spi_printtup, spi_dest_startup, donothingCleanup, donothingCleanup,
	DestSPI
};

SPI的结果不是直接返回给客户端的!SPI有自己的三个全局变量来指向结果集,SPI的接口函数会从全局变量中取值,组织后返回给客户端。(使用全局变量当接口的设计很差!)

uint64           SPI_processed = 0;     // 行数
SPITupleTable    *SPI_tuptable = NULL;  // 数据
int              SPI_result = 0;        // 执行结果

例子

直接执行:

```c
cat << EOF > spitest.c
#include "postgres.h"
#include "executor/spi.h"
#include "utils/builtins.h"
#include "fmgr.h"

PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(sptest1);

Datum
sptest1(PG_FUNCTION_ARGS)
{
  char *sql10 = "select s::int, left(random()::text,4) l from generate_series(1,10) s";
  int ret;
  int proc;
  SPI_connect();
  ret = SPI_exec(sql10, 0);
  proc = SPI_processed;
  
  if (ret > 0 && SPI_tuptable != NULL)
  {
    SPITupleTable *tuptable = SPI_tuptable;
    TupleDesc tupdesc = tuptable->tupdesc;
    char buf[8192];
    uint64 j;
    for (j = 0; j < tuptable->numvals; j++)
    {
      HeapTuple tuple = tuptable->vals[j];
      int i;
      for (i = 1, buf[0] = 0; i <= tupdesc->natts; i++)
        snprintf(buf + strlen(buf), 
                 sizeof(buf) - strlen(buf), " %4s%4s",
                 SPI_getvalue(tuple, tupdesc, i),
                 (i == tupdesc->natts) ? " " : " |");
        elog(INFO, "%s", buf);
    }
  }
  
  
  SPI_finish();
  return (proc);
}
EOF

gcc -O0 -Wall -I /home/mingjiegao/dev/src/postgres/src/include -g -shared -fpic -o spitest.so spitest.c

psql执行:

postgres=# select sptest1();
INFO:      1   | 0.10    
INFO:      2   | 0.18    
INFO:      3   | 0.01    
INFO:      4   | 0.78    
INFO:      5   | 0.60    
INFO:      6   | 0.76    
INFO:      7   | 0.18    
INFO:      8   | 0.86    
INFO:      9   | 0.19    
INFO:     10   | 0.99    
 sptest1 
---------
      10
(1 row)

1 rStartup = spi_dest_startup

位置

sptest1
  SPI_exec
    SPI_execute
      _SPI_execute_plan
        _SPI_pquery
          ExecutorRun
            standard_ExecutorRun
              spi_dest_startup

流程

void
spi_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
{
	SPITupleTable *tuptable;
	MemoryContext oldcxt;
	MemoryContext tuptabcxt;

	if (_SPI_current == NULL)
		elog(ERROR, "spi_dest_startup called while not connected to SPI");

	if (_SPI_current->tuptable != NULL)
		elog(ERROR, "improper call to spi_dest_startup");
  • 从"ExecutorState"切换到"SPI Proc"
  • 创建"SPI TupTable",切换到"SPI TupTable"
	oldcxt = _SPI_procmem();	/* switch to procedure memory context */

	tuptabcxt = AllocSetContextCreate(CurrentMemoryContext,
									  "SPI TupTable",
									  ALLOCSET_DEFAULT_SIZES);
	MemoryContextSwitchTo(tuptabcxt);

在"SPI TupTable"中申请SPITupleTable结构,由_SPI_current->tuptable记录:

SPITupleTable结构中有:

  • TupleDesc tupdesc;
  • HeapTuple *vals;
  • uint64 numvals;

记录结果集数据。

	_SPI_current->tuptable = tuptable = (SPITupleTable *)
		palloc0(sizeof(SPITupleTable));
	tuptable->tuptabcxt = tuptabcxt;
	tuptable->subid = GetCurrentSubTransactionId();

	/*
	 * The tuptable is now valid enough to be freed by AtEOSubXact_SPI, so put
	 * it onto the SPI context's tuptables list.  This will ensure it's not
	 * leaked even in the unlikely event the following few lines fail.
	 */
  • _SPI_connection中保存了slist_head tuptables;所有活跃的tuptable链表。
  • 申请128个HeapTupleData指针位置保存结果数据。
	slist_push_head(&_SPI_current->tuptables, &tuptable->next);

	/* set up initial allocations */
	tuptable->alloced = 128;
	tuptable->vals = (HeapTuple *) palloc(tuptable->alloced * sizeof(HeapTuple));
	tuptable->numvals = 0;
	tuptable->tupdesc = CreateTupleDescCopy(typeinfo);

	MemoryContextSwitchTo(oldcxt);
}

2 receiveSlot = spi_printtup

位置

sptest1
  SPI_exec
    SPI_execute
      _SPI_execute_plan
        _SPI_pquery
          ExecutorRun
            standard_ExecutorRun
              ExecutePlan
                spi_printtup

流程

bool
spi_printtup(TupleTableSlot *slot, DestReceiver *self)
{
	SPITupleTable *tuptable;
	MemoryContext oldcxt;

	if (_SPI_current == NULL)
		elog(ERROR, "spi_printtup called while not connected to SPI");

tuptable还没赋值的状态: {tupdesc = 0x107ea90, vals = 0x107e678, numvals = 0, alloced = 128, tuptabcxt = 0x107e500, next = {next = 0x0}, subid = 1}

	tuptable = _SPI_current->tuptable;
	if (tuptable == NULL)
		elog(ERROR, "improper call to spi_printtup");
  • 切到"SPI TupTable"
  • 分配的128个位置不够用了?不够在申请256个。
	oldcxt = MemoryContextSwitchTo(tuptable->tuptabcxt);

	if (tuptable->numvals >= tuptable->alloced)
	{
		/* Double the size of the pointer array */
		uint64		newalloced = tuptable->alloced * 2;

		tuptable->vals = (HeapTuple *) repalloc_huge(tuptable->vals,
													 newalloced * sizeof(HeapTuple));
		tuptable->alloced = newalloced;
	}
  • 调用tts接口函数ExecCopySlotHeapTuple做元组拷贝,这里实际使用的是tts_virtual_copy_heap_tuple,参考《Postgresql源码(76)执行器专用元组格式TupleTableSlot》。
  • ExecCopySlotHeapTuple输入tts输出标准存储格式HeapTuple。
	tuptable->vals[tuptable->numvals] = ExecCopySlotHeapTuple(slot);
	(tuptable->numvals)++;

	MemoryContextSwitchTo(oldcxt);

	return true;
}

3 rShutdown = donothingCleanup

4 rDestroy = donothingCleanup