From 3da5deacfbe2de73f1fe11274b223487e1b606ac Mon Sep 17 00:00:00 2001 From: Laurenz Albe Date: Thu, 18 Apr 2019 00:21:04 +0200 Subject: [PATCH] Add support for COPY This capability was introduced to PostgreSQL in commit 3d956d9562aa4811b5eaaaf5314d361c61be2ae0 in v11. Attempting COPY on a foreign table crashed PostgreSQL before this patch, as reported by "jkldv" in #309. This patch is still incomplete: - too little testing - crashes if there is an AFTER INSERT trigger - no regression tests yet --- oracle_fdw.c | 350 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 242 insertions(+), 108 deletions(-) diff --git a/oracle_fdw.c b/oracle_fdw.c index 71d7f39..e70ce15 100644 --- a/oracle_fdw.c +++ b/oracle_fdw.c @@ -308,6 +308,10 @@ static void oracleReScanForeignScan(ForeignScanState *node); static void oracleAddForeignUpdateTargets(Query *parsetree, RangeTblEntry *target_rte, Relation target_relation); static List *oraclePlanForeignModify(PlannerInfo *root, ModifyTable *plan, Index resultRelation, int subplan_index); static void oracleBeginForeignModify(ModifyTableState *mtstate, ResultRelInfo *rinfo, List *fdw_private, int subplan_index, int eflags); +#if PG_VERSION_NUM >= 110000 +static void oracleBeginForeignInsert(ModifyTableState *mtstate, ResultRelInfo *rinfo); +static void oracleEndForeignInsert(EState *estate, ResultRelInfo *rinfo); +#endif /*PG_VERSION_NUM */ static TupleTableSlot *oracleExecForeignInsert(EState *estate, ResultRelInfo *rinfo, TupleTableSlot *slot, TupleTableSlot *planSlot); static TupleTableSlot *oracleExecForeignUpdate(EState *estate, ResultRelInfo *rinfo, TupleTableSlot *slot, TupleTableSlot *planSlot); static TupleTableSlot *oracleExecForeignDelete(EState *estate, ResultRelInfo *rinfo, TupleTableSlot *slot, TupleTableSlot *planSlot); @@ -369,6 +373,9 @@ static void oracleDie(SIGNAL_ARGS); static char *setSelectParameters(struct paramDesc *paramList, ExprContext *econtext); static void convertTuple(struct OracleFdwState *fdw_state, Datum *values, bool *nulls, bool trunc_lob); static void errorContextCallback(void *arg); +static bool hasTrigger(Relation rel, CmdType cmdtype); +static void buildInsertQuery(StringInfo sql, struct OracleFdwState *fdwState); +static void appendReturningClause(StringInfo sql, struct OracleFdwState *fdwState); #ifdef IMPORT_API static char *fold_case(char *name, fold_t foldcase); #endif /* IMPORT_API */ @@ -407,6 +414,10 @@ oracle_fdw_handler(PG_FUNCTION_ARGS) fdwroutine->AddForeignUpdateTargets = oracleAddForeignUpdateTargets; fdwroutine->PlanForeignModify = oraclePlanForeignModify; fdwroutine->BeginForeignModify = oracleBeginForeignModify; +#if PG_VERSION_NUM >= 110000 + fdwroutine->BeginForeignInsert = oracleBeginForeignInsert; + fdwroutine->EndForeignInsert = oracleEndForeignInsert; +#endif /*PG_VERSION_NUM */ fdwroutine->ExecForeignInsert = oracleExecForeignInsert; fdwroutine->ExecForeignUpdate = oracleExecForeignUpdate; fdwroutine->ExecForeignDelete = oracleExecForeignDelete; @@ -1161,9 +1172,8 @@ ForeignScan rel = heap_open(foreigntableid, NoLock); /* is there an AFTER trigger FOR EACH ROW? */ - has_trigger = (foreignrel->relid == root->parse->resultRelation) && rel->trigdesc - && ((root->parse->commandType == CMD_UPDATE && rel->trigdesc->trig_update_after_row) - || (root->parse->commandType == CMD_DELETE && rel->trigdesc->trig_delete_after_row)); + has_trigger = (foreignrel->relid == root->parse->resultRelation) + && hasTrigger(rel, root->parse->commandType); heap_close(rel, NoLock); @@ -1598,7 +1608,6 @@ oraclePlanForeignModify(PlannerInfo *root, ModifyTable *plan, Index resultRelati int attnum, i; ListCell *cell; bool has_trigger = false, firstcol; - struct paramDesc *param; char paramName[10]; TupleDesc tupdesc; Bitmapset *tmpset; @@ -1657,7 +1666,7 @@ oraclePlanForeignModify(PlannerInfo *root, ModifyTable *plan, Index resultRelati } /* is there a row level AFTER trigger? */ - has_trigger = rel->trigdesc && rel->trigdesc->trig_insert_after_row; + has_trigger = hasTrigger(rel, CMD_INSERT); break; case CMD_UPDATE: @@ -1676,13 +1685,13 @@ oraclePlanForeignModify(PlannerInfo *root, ModifyTable *plan, Index resultRelati } /* is there a row level AFTER trigger? */ - has_trigger = rel->trigdesc && rel->trigdesc->trig_update_after_row; + has_trigger = hasTrigger(rel, CMD_UPDATE); break; case CMD_DELETE: /* is there a row level AFTER trigger? */ - has_trigger = rel->trigdesc && rel->trigdesc->trig_delete_after_row; + has_trigger = hasTrigger(rel, CMD_DELETE); break; default: @@ -1755,55 +1764,7 @@ oraclePlanForeignModify(PlannerInfo *root, ModifyTable *plan, Index resultRelati switch (operation) { case CMD_INSERT: - appendStringInfo(&sql, "INSERT INTO %s (", fdwState->oraTable->name); - - firstcol = true; - for (i = 0; i < fdwState->oraTable->ncols; ++i) - { - /* don't add columns beyond the end of the PostgreSQL table */ - if (fdwState->oraTable->cols[i]->pgname == NULL) - continue; - - if (firstcol) - firstcol = false; - else - appendStringInfo(&sql, ", "); - appendStringInfo(&sql, "%s", fdwState->oraTable->cols[i]->name); - } - - appendStringInfo(&sql, ") VALUES ("); - - firstcol = true; - for (i = 0; i < fdwState->oraTable->ncols; ++i) - { - /* don't add columns beyond the end of the PostgreSQL table */ - if (fdwState->oraTable->cols[i]->pgname == NULL) - continue; - - /* check that the data types can be converted */ - checkDataType( - fdwState->oraTable->cols[i]->oratype, - fdwState->oraTable->cols[i]->scale, - fdwState->oraTable->cols[i]->pgtype, - fdwState->oraTable->pgname, - fdwState->oraTable->cols[i]->pgname - ); - - /* add a parameter description for the column */ - snprintf(paramName, 9, ":p%d", fdwState->oraTable->cols[i]->pgattnum); - addParam(&fdwState->paramList, paramName, fdwState->oraTable->cols[i]->pgtype, - fdwState->oraTable->cols[i]->oratype, i); - - /* add parameter name */ - if (firstcol) - firstcol = false; - else - appendStringInfo(&sql, ", "); - - appendAsType(&sql, paramName, fdwState->oraTable->cols[i]->pgtype); - } - - appendStringInfo(&sql, ")"); + buildInsertQuery(&sql, fdwState); break; case CMD_UPDATE: @@ -1892,57 +1853,7 @@ oraclePlanForeignModify(PlannerInfo *root, ModifyTable *plan, Index resultRelati } } - /* add RETURNING clause if appropriate */ - firstcol = true; - for (i=0; ioraTable->ncols; ++i) - if (fdwState->oraTable->cols[i]->used) - { - if (firstcol) - { - firstcol = false; - appendStringInfo(&sql, " RETURNING "); - } - else - appendStringInfo(&sql, ", "); - appendStringInfo(&sql, "%s", fdwState->oraTable->cols[i]->name); - } - - /* add the parameters for the RETURNING clause */ - firstcol = true; - for (i=0; ioraTable->ncols; ++i) - if (fdwState->oraTable->cols[i]->used) - { - /* check that the data types can be converted */ - checkDataType( - fdwState->oraTable->cols[i]->oratype, - fdwState->oraTable->cols[i]->scale, - fdwState->oraTable->cols[i]->pgtype, - fdwState->oraTable->pgname, - fdwState->oraTable->cols[i]->pgname - ); - - /* create a new entry in the parameter list */ - param = (struct paramDesc *)palloc(sizeof(struct paramDesc)); - snprintf(paramName, 9, ":r%d", fdwState->oraTable->cols[i]->pgattnum); - param->name = pstrdup(paramName); - param->type = fdwState->oraTable->cols[i]->pgtype; - param->bindType = BIND_OUTPUT; - param->value = NULL; - param->node = NULL; - param->bindh = NULL; - param->colnum = i; - param->next = fdwState->paramList; - fdwState->paramList = param; - - if (firstcol) - { - firstcol = false; - appendStringInfo(&sql, " INTO "); - } - else - appendStringInfo(&sql, ", "); - appendStringInfo(&sql, "%s", paramName); - } + appendReturningClause(&sql, fdwState); fdwState->query = sql.data; @@ -2018,6 +1929,101 @@ oracleBeginForeignModify(ModifyTableState *mtstate, ResultRelInfo *rinfo, List * ALLOCSET_SMALL_SIZES); } +#if PG_VERSION_NUM >= 110000 +/* + * oracleBeginForeignInsert + * Initialize the FDW state for COPY to a foreign table. + */ +void oracleBeginForeignInsert(ModifyTableState *mtstate, ResultRelInfo *rinfo) +{ + EState *estate = mtstate->ps.state; + struct OracleFdwState *fdw_state; + StringInfoData buf; + struct paramDesc *param; + HeapTuple tuple; + int i; + + elog(DEBUG3, "oracle_fdw: execute foreign table COPY on %d", RelationGetRelid(rinfo->ri_RelationDesc)); + + fdw_state = getFdwState(RelationGetRelid(rinfo->ri_RelationDesc), NULL); + + fdw_state->session = oracleGetSession( + fdw_state->dbserver, + fdw_state->user, + fdw_state->password, + fdw_state->nls_lang, + fdw_state->oraTable->pgname, + GetCurrentTransactionNestLevel() + ); + + if (hasTrigger(rinfo->ri_RelationDesc, CMD_INSERT)) + { + /* all attributes are needed for the RETURNING clause */ + for (i=0; ioraTable->ncols; ++i) + if (fdw_state->oraTable->cols[i]->pgname != NULL) + { + /* throw an error if it is a LONG or LONG RAW column */ + if (fdw_state->oraTable->cols[i]->oratype == ORA_TYPE_LONGRAW + || fdw_state->oraTable->cols[i]->oratype == ORA_TYPE_LONG) + ereport(ERROR, + (errcode(ERRCODE_FDW_INVALID_DATA_TYPE), + errmsg("columns with Oracle type LONG or LONG RAW cannot be used in RETURNING clause"), + errdetail("Column \"%s\" of foreign table \"%s\" is of Oracle type LONG%s.", + fdw_state->oraTable->cols[i]->pgname, + fdw_state->oraTable->pgname, + fdw_state->oraTable->cols[i]->oratype == ORA_TYPE_LONG ? "" : " RAW"))); + + fdw_state->oraTable->cols[i]->used = 1; + } + } + + /* construct an INSERT query */ + initStringInfo(&buf); + buildInsertQuery(&buf, fdw_state); + appendReturningClause(&buf, fdw_state); + fdw_state->query = pstrdup(buf.data); + + /* get the type output functions for the parameters */ + output_funcs = (regproc *)palloc0(fdw_state->oraTable->ncols * sizeof(regproc *)); + for (param=fdw_state->paramList; param!=NULL; param=param->next) + { + /* ignore output parameters */ + if (param->bindType == BIND_OUTPUT) + continue; + + tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(fdw_state->oraTable->cols[param->colnum]->pgtype)); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for type %u", fdw_state->oraTable->cols[param->colnum]->pgtype); + output_funcs[param->colnum] = ((Form_pg_type)GETSTRUCT(tuple))->typoutput; + ReleaseSysCache(tuple); + } + + oraclePrepareQuery(fdw_state->session, fdw_state->query, fdw_state->oraTable, 0); + + /* create a memory context for short-lived memory */ + fdw_state->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, + "oracle_fdw temporary data", + ALLOCSET_SMALL_SIZES); + + rinfo->ri_FdwState = (void *)fdw_state; +} + +void +oracleEndForeignInsert(EState *estate, ResultRelInfo *rinfo) +{ + struct OracleFdwState *fdw_state = (struct OracleFdwState *)rinfo->ri_FdwState; + + elog(DEBUG3, "oracle_fdw: end foreign table COPY on %d", RelationGetRelid(rinfo->ri_RelationDesc)); + + MemoryContextDelete(fdw_state->temp_cxt); + + /* release the Oracle session */ + oracleCloseStatement(fdw_state->session); + pfree(fdw_state->session); + fdw_state->session = NULL; +} +#endif /*PG_VERSION_NUM */ + /* * oracleExecForeignInsert * Set the parameter values from the slots and execute the INSERT statement. @@ -3347,7 +3353,7 @@ acquireSampleRowsFunc(Relation relation, int elevel, HeapTuple *rows, int targro /* get connection options, connect and get the remote table description */ fdw_state = getFdwState(RelationGetRelid(relation), &sample_percent); - fdw_state ->paramList = NULL; + fdw_state->paramList = NULL; fdw_state->rowcount = 0; /* construct query */ @@ -5790,6 +5796,134 @@ setModifyParameters(struct paramDesc *paramList, TupleTableSlot *newslot, TupleT } } } + +bool +hasTrigger(Relation rel, CmdType cmdtype) +{ + return rel->trigdesc + && ((cmdtype == CMD_UPDATE && rel->trigdesc->trig_update_after_row) + || (cmdtype == CMD_INSERT && rel->trigdesc->trig_insert_after_row) + || (cmdtype == CMD_DELETE && rel->trigdesc->trig_delete_after_row)); +} + +void +buildInsertQuery(StringInfo sql, struct OracleFdwState *fdwState) +{ + bool firstcol; + int i; + char paramName[10]; + + appendStringInfo(sql, "INSERT INTO %s (", fdwState->oraTable->name); + + firstcol = true; + for (i = 0; i < fdwState->oraTable->ncols; ++i) + { + /* don't add columns beyond the end of the PostgreSQL table */ + if (fdwState->oraTable->cols[i]->pgname == NULL) + continue; + + if (firstcol) + firstcol = false; + else + appendStringInfo(sql, ", "); + appendStringInfo(sql, "%s", fdwState->oraTable->cols[i]->name); + } + + appendStringInfo(sql, ") VALUES ("); + + firstcol = true; + for (i = 0; i < fdwState->oraTable->ncols; ++i) + { + /* don't add columns beyond the end of the PostgreSQL table */ + if (fdwState->oraTable->cols[i]->pgname == NULL) + continue; + + /* check that the data types can be converted */ + checkDataType( + fdwState->oraTable->cols[i]->oratype, + fdwState->oraTable->cols[i]->scale, + fdwState->oraTable->cols[i]->pgtype, + fdwState->oraTable->pgname, + fdwState->oraTable->cols[i]->pgname + ); + + /* add a parameter description for the column */ + snprintf(paramName, 9, ":p%d", fdwState->oraTable->cols[i]->pgattnum); + addParam(&fdwState->paramList, paramName, fdwState->oraTable->cols[i]->pgtype, + fdwState->oraTable->cols[i]->oratype, i); + + /* add parameter name */ + if (firstcol) + firstcol = false; + else + appendStringInfo(sql, ", "); + + appendAsType(sql, paramName, fdwState->oraTable->cols[i]->pgtype); + } + + appendStringInfo(sql, ")"); +} + +void +appendReturningClause(StringInfo sql, struct OracleFdwState *fdwState) +{ + int i; + bool firstcol; + struct paramDesc *param; + char paramName[10]; + + /* add the RETURNING clause itself */ + firstcol = true; + for (i=0; ioraTable->ncols; ++i) + if (fdwState->oraTable->cols[i]->used) + { + if (firstcol) + { + firstcol = false; + appendStringInfo(sql, " RETURNING "); + } + else + appendStringInfo(sql, ", "); + appendStringInfo(sql, "%s", fdwState->oraTable->cols[i]->name); + } + + /* add the parameters for the RETURNING clause */ + firstcol = true; + for (i=0; ioraTable->ncols; ++i) + if (fdwState->oraTable->cols[i]->used) + { + /* check that the data types can be converted */ + checkDataType( + fdwState->oraTable->cols[i]->oratype, + fdwState->oraTable->cols[i]->scale, + fdwState->oraTable->cols[i]->pgtype, + fdwState->oraTable->pgname, + fdwState->oraTable->cols[i]->pgname + ); + + /* create a new entry in the parameter list */ + param = (struct paramDesc *)palloc(sizeof(struct paramDesc)); + snprintf(paramName, 9, ":r%d", fdwState->oraTable->cols[i]->pgattnum); + param->name = pstrdup(paramName); + param->type = fdwState->oraTable->cols[i]->pgtype; + param->bindType = BIND_OUTPUT; + param->value = NULL; + param->node = NULL; + param->bindh = NULL; + param->colnum = i; + param->next = fdwState->paramList; + fdwState->paramList = param; + + if (firstcol) + { + firstcol = false; + appendStringInfo(sql, " INTO "); + } + else + appendStringInfo(sql, ", "); + appendStringInfo(sql, "%s", paramName); + } +} #endif /* WRITE_API */ /*