source: trunk/zoo-project/zoo-kernel/service_callback.c @ 976

Last change on this file since 976 was 972, checked in by djay, 4 years ago

Small fix to build when json is activated and mapserver is not

  • Property svn:keywords set to Id
File size: 31.4 KB
Line 
1/*
2 * Author : Gérald FENOY
3 *
4 *  Copyright 2017-2019 GeoLabs SARL. All rights reserved.
5 *
6 * This work was supported by public funds received in the framework of GEOSUD,
7 * a project (ANR-10-EQPX-20) of the program "Investissements d'Avenir" managed
8 * by the French National Research Agency
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 */
28
29#ifdef USE_CALLBACK
30#include "service_internal_ms.h"
31#endif
32#include <pthread.h>
33#include <libxml/tree.h>
34#include <libxml/parser.h>
35#include <libxml/xpath.h>
36#include <libxml/xpathInternals.h>
37
38#include <libxslt/xslt.h>
39#include <libxslt/xsltInternals.h>
40#include <libxslt/transform.h>
41#include <libxslt/xsltutils.h>
42
43#include "service_callback.h"
44#include "service_json.h"
45#include "sqlapi.h"
46#include <ulinet.h>
47
48
49#ifdef __cplusplus
50extern "C" {
51#endif
52
53  /**
54   * Parameter definition to be used for sending parameters to a thread.
55   */
56  typedef struct {
57    maps *conf;      //!< the main configuration file
58    map *url;        //!< the callback url maps
59    json_object *res;//!< the JSON object to post
60    int step;        //!< the current step [0,6]
61    int state;       //!< the current state [0,1]
62  } local_params;
63
64  /**
65   * Number of threads
66   */
67  int nbThreads=0;
68  /**
69   * Current step
70   */
71  int cStep=0;
72  /**
73   * Maximum value of PercentCompleted
74   */
75  int maxProgress=0;
76  /**
77   * Is there any ongoing HTTP request
78   */
79  int isOngoing=0;
80  /**
81   * Threads array
82   */
83  pthread_t* myThreads=NULL;
84  /**
85   * Steps array
86   */
87  bool steps[7][2]={
88    {false,false},
89    {false,false},
90    {false,false},
91    {false,false},
92    {false,false},
93    {false,false},
94    {false,false}
95  };
96  /**
97   * Arguments array to give to the _invokeCallback thread's function
98   */
99  local_params** local_arguments;
100 
101  /**
102   * Check if a service name is prohibited, meaning that the Kernel doesn't have
103   * to invoke the callback for this specific service.
104   *
105   * @param conf the main configuration file maps
106   * @param serviceName the serviceName
107   * @return a bool true if the service is prohibited, false in other case
108   */
109  bool isProhibited(maps* conf,const char* serviceName){
110    map* plist=getMapFromMaps(conf,"callback","prohibited");
111    if(plist!=NULL){
112      char *tmp=plist->value;
113      char *tmpS=strtok(tmp,",");
114      while(tmpS!=NULL){
115        if(strcmp(serviceName,tmpS)==0)
116          return true;
117        tmpS=strtok(NULL,",");
118      }
119    }
120    return false;
121  }
122  /**
123   * Practically invoke the callback, meaning sending the HTTP POST request.
124   *
125   * @param args local_params containing all the variables required
126   */
127  void* _invokeBasicCallback(void* args){
128#ifdef CALLBACK_DEBUG
129    fprintf(stderr,"************************* From thread %d %s %d: REQUEST CONFIGURE (%s)\n",pthread_self(),__FILE__,__LINE__,arg->url->value);
130    fflush(stderr);
131#endif
132    local_params* arg=(local_params*)args;
133    if(arg->state<cStep){
134#ifdef CALLBACK_DEBUG
135      fprintf(stderr,"************************* From thread %d %s %d: REQUEST CANCELLED (%s) EXIT!\n",pthread_self(),__FILE__,__LINE__,arg->url->value);
136      fflush(stderr);
137#endif
138      freeMaps(&arg->conf);
139      free(arg->conf);
140      freeMap(&arg->url);
141      free(arg->url);
142      pthread_exit(NULL);
143      return NULL;
144    }
145    HINTERNET hInternet,res1;
146    const struct tm *tm;
147    size_t len;
148    char *tmp1;
149    map *tmpStatus;
150    map* pmTmp=getMapFromMaps(arg->conf,"lenv","status");
151    hInternet=InternetOpen("ZooWPSClient\0",
152                           INTERNET_OPEN_TYPE_PRECONFIG,
153                           NULL,NULL, 0);
154    if(!CHECK_INET_HANDLE(hInternet)){
155      InternetCloseHandle (&hInternet);
156      return NULL;
157    }
158    const char* jsonStr=json_object_to_json_string_ext(arg->res,JSON_C_TO_STRING_PLAIN);
159    while( arg->state != SERVICE_SUCCEEDED && arg->state != SERVICE_FAILED && isOngoing>0 ){
160      zSleep(100);
161    }
162    if(arg->state==SERVICE_STARTED && pmTmp!=NULL){
163      if(maxProgress<=atoi(pmTmp->value)){
164        maxProgress=atoi(pmTmp->value);
165      }else{
166#ifdef CALLBACK_DEBUG
167        fprintf(stderr,"************************* From thread %d %s %d: REQUEST CANCELLED (%s) EXIT!\n",pthread_self(),__FILE__,__LINE__,arg->url->value);
168        fflush(stderr);
169#endif
170        freeMaps(&arg->conf);
171        free(arg->conf);
172        freeMap(&arg->url);
173        free(arg->url);
174        pthread_exit(NULL);
175        return NULL;
176      }
177    }else
178      maxProgress=101;
179    isOngoing=1;
180    maps* tmpConf=createMaps("main");
181    tmpConf->content=createMap("memory","load");
182
183    hInternet.waitingRequests[0] = zStrdup(arg->url->value);
184    res1 = InternetOpenUrl (&hInternet,
185                            hInternet.waitingRequests[0], 
186                            (char*)jsonStr, strlen(jsonStr),
187                            INTERNET_FLAG_NO_CACHE_WRITE,
188                            0,tmpConf);
189    AddHeaderEntries(&hInternet,arg->conf);
190    AddMissingHeaderEntry(&hInternet.ihandle[hInternet.nb-1],"Content-Type","application/json");
191#ifdef CALLBACK_DEBUG
192    curl_easy_setopt(hInternet.ihandle[hInternet.nb-1].handle, CURLOPT_VERBOSE, 1);
193#endif
194    if(hInternet.ihandle[hInternet.nb-1].header!=NULL)
195      curl_easy_setopt(hInternet.ihandle[hInternet.nb-1].handle,CURLOPT_HTTPHEADER,hInternet.ihandle[hInternet.nb-1].header);
196    processDownloads(&hInternet);
197    freeMaps(&tmpConf);
198    free(tmpConf);
199#ifdef CALLBACK_DEBUG
200    char *tmp = (char *) malloc ((hInternet.ihandle[0].nDataLen + 1)
201                                 * sizeof (char));
202    if (tmp == NULL)
203      {
204        setMapInMaps(arg->conf,"lenv","message",_("Unable to allocate memory"));
205        setMapInMaps(arg->conf,"lenv","code","InternalError");
206        return NULL;
207      }
208    size_t bRead;
209    InternetReadFile (hInternet.ihandle[0],
210                      (LPVOID) tmp,
211                      hInternet.
212                      ihandle[0].nDataLen,
213                      &bRead);
214    tmp[hInternet.ihandle[0].nDataLen] = 0;
215    fprintf(stderr,"************************* From thread %d %s %d: REQUEST END \n%s",pthread_self(),__FILE__,__LINE__,tmp);
216    fflush(stderr);
217    free(tmp);
218#endif
219    json_object_put(arg->res);
220    InternetCloseHandle(&hInternet);
221    isOngoing=0;
222    freeMaps(&arg->conf);
223    free(arg->conf);
224    freeMap(&arg->url);
225    if(arg->url!=NULL)
226      free(arg->url);
227    pthread_exit(NULL);
228  }
229
230  /**
231   * Invoke the callback in case there is a [subscriber] section containing one
232   * or more url parameter.
233   *
234   * @param conf the maps containing the main configuration file definitions
235   * @param state the service state SERVICE_SUCCEEDED / STARTED / FAILED
236   * @return bool true in case of success, false in other cases
237   */
238  bool invokeBasicCallback(maps* conf,int state){
239    map* url=getMapFromMaps(conf,"subscriber","inProgressUri");
240    if(state==SERVICE_SUCCEEDED)
241      url=getMapFromMaps(conf,"subscriber","successUri");
242    else
243      if(state==SERVICE_FAILED)
244        url=getMapFromMaps(conf,"subscriber","failedUri");
245    if(url==NULL)
246      return false;
247    map* url0=createMap("url",url->value);
248    map* sname=getMapFromMaps(conf,"lenv","identifier");
249    if(sname!=NULL && isProhibited(conf,sname->value))
250      return false;
251    if(state<cStep)
252      return true;
253    if(cStep!=state || isOngoing==0){
254      json_object *res=NULL;
255      if(state==SERVICE_SUCCEEDED || state==SERVICE_FAILED){
256        maps* pmsTmp=getMaps(conf,"lenv");
257        setMapInMaps(conf,"lenv","no-write","true");
258        map* pmTmp=getMapFromMaps(conf,"lenv","usid");
259        if(pmTmp!=NULL){
260          map* pmResponse=getMapFromMaps(conf,"lenv","jsonStr");
261          res=parseJson(conf,pmResponse->value);
262        }
263      }else
264        res=createStatus(conf,state);
265      if(local_arguments==NULL)
266        local_arguments=(local_params**)malloc(sizeof(local_params*));
267      else
268        local_arguments=(local_params**)realloc(local_arguments,(nbThreads+1)*sizeof(local_params*));
269      local_arguments[nbThreads]=(local_params*)malloc(MAPS_SIZE+MAP_SIZE+sizeof(json_object*)+(2*sizeof(int)));       
270      local_arguments[nbThreads]->conf=dupMaps(&conf);
271      local_arguments[nbThreads]->url=url0;
272      local_arguments[nbThreads]->res=res;
273      local_arguments[nbThreads]->step=0;
274      local_arguments[nbThreads]->state=state;
275      cStep=state;
276      if(myThreads==NULL)
277        myThreads=(pthread_t*)malloc((nbThreads+1)*sizeof(pthread_t));
278      else
279        myThreads=(pthread_t*)realloc(myThreads,(nbThreads+1)*sizeof(pthread_t));
280      if(pthread_create(&myThreads[nbThreads], NULL, _invokeBasicCallback, (void*)local_arguments[nbThreads])==-1){
281        setMapInMaps(conf,"lenv","message",_("Unable to create a new thread"));
282        return false;
283      }
284      nbThreads++;
285    }
286    return true;
287  }
288
289#ifdef USE_CALLBACK
290  /**
291   * Practically invoke the callback, meaning sending the HTTP POST request.
292   *
293   * @param args local_params containing all the variables required
294   */
295  void* _invokeCallback(void* args){
296    local_params* arg=(local_params*)args;
297    HINTERNET hInternet,res1;
298    const struct tm *tm;
299    size_t len;
300    time_t now;
301    char *tmp1;
302    map *tmpStatus;
303    maps* tmpConf=createMaps("main");
304    tmpConf->content=createMap("memory","load");
305    hInternet=InternetOpen("ZooWPSClient\0",
306                           INTERNET_OPEN_TYPE_PRECONFIG,
307                           NULL,NULL, 0);
308    if(!CHECK_INET_HANDLE(hInternet)){
309      InternetCloseHandle (&hInternet);
310      return NULL;
311    }
312    char *URL=(char*)malloc((strlen(arg->url->value)+5)*sizeof(char));
313    sprintf(URL,"%s%d_%d/",arg->url->value,arg->step,arg->state);
314    const char* jsonStr=json_object_to_json_string_ext(arg->res,JSON_C_TO_STRING_PLAIN);
315    hInternet.waitingRequests[0] = zStrdup(URL);
316    free(URL);
317#ifdef CALLBACK_DEBUG
318    now = time ( NULL );
319    tm = localtime ( &now );
320    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
321    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
322    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS cStep %d %d\n",pthread_self(),__FILE__,__LINE__,cStep,isOngoing);
323    fprintf(stderr," * JSON: [%s] \n",jsonStr);
324    fprintf(stderr," * URL: %s/ \n\n",hInternet.waitingRequests[0]);
325    fprintf(stderr," * DATE: %s/ \n\n",tmp1);
326    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS\n",pthread_self(),__FILE__,__LINE__);
327    free(tmp1);
328#endif
329    while( (arg->step!=7 || isOngoing>0) &&
330           ( cStep!=arg->step || (arg->state!=0 && steps[arg->step][0]==false) )
331           ){
332      zSleep(100);
333    }
334    isOngoing=1;
335#ifdef CALLBACK_DEBUG
336    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START\n\n",pthread_self(),__FILE__,__LINE__);
337    int i=0;
338    for(i=0;i<7;i++){
339      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
340    }
341#endif
342   
343    now = time ( NULL );
344    tm = localtime ( &now );
345   
346    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
347    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
348
349#ifdef CALLBACK_DEBUG   
350    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START (%s)\n",pthread_self(),__FILE__,__LINE__,tmp1);
351    fflush(stderr);
352#endif   
353    free(tmp1);
354    res1 = InternetOpenUrl (&hInternet,
355                            hInternet.waitingRequests[0], 
356                            (char*)jsonStr, strlen(jsonStr),
357                            INTERNET_FLAG_NO_CACHE_WRITE,
358                            0,tmpConf);
359    AddHeaderEntries(&hInternet,arg->conf);
360    //curl_easy_setopt(hInternet.ihandle[hInternet.nb].handle, CURLOPT_VERBOSE, 1);x
361    processDownloads(&hInternet);
362    freeMaps(&tmpConf);
363    free(tmpConf);
364    now = time ( NULL );
365    tm = localtime ( &now );
366    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
367    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
368   
369#ifdef CALLBACK_DEBUG   
370    fprintf(stderr,"************************* From thread %d %s %d: REQUEST END (%s)\n\n",pthread_self(),__FILE__,__LINE__,tmp1);
371#endif
372    free(tmp1);
373    char *tmp = (char *) malloc ((hInternet.ihandle[0].nDataLen + 1)
374                                 * sizeof (char));
375    if (tmp == NULL)
376      {
377        setMapInMaps(arg->conf,"lenv","message",_("Unable to allocate memory"));
378        setMapInMaps(arg->conf,"lenv","code","InternalError");
379        return NULL;
380      }
381    size_t bRead;
382    InternetReadFile (hInternet.ihandle[0],
383                      (LPVOID) tmp,
384                      hInternet.
385                      ihandle[0].nDataLen,
386                      &bRead);
387    tmp[hInternet.ihandle[0].nDataLen] = 0;
388    json_object_put(arg->res);
389    InternetCloseHandle(&hInternet);
390    isOngoing=0;
391    if(cStep==0 || cStep==6 || arg->state==1)
392      cStep=arg->step+1;
393#ifdef CALLBACK_DEBUG
394    now = time ( NULL );
395    tm = localtime ( &now );
396    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
397    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
398    fprintf(stderr,"************************* From thread %d %s %d: RESPONSE CONTENT (%s)\n",pthread_self(),__FILE__,__LINE__,tmp1);
399    for(i=0;i<7;i++){
400      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
401    }
402    fprintf(stderr,"Result: \n%s\n\n",tmp);
403    fprintf(stderr,"************************* From thread %d %s %d\n\n",pthread_self(),__FILE__,__LINE__);
404    fflush(stderr);
405    free(tmp1);
406#endif
407    steps[arg->step][arg->state]=true;
408    free(tmp);
409#ifdef CALLBACK_DEBUG
410    fprintf(stderr,"************************* From thread %d %s %d: EXIT\n\n",pthread_self(),__FILE__,__LINE__);
411    fflush(stderr);
412#endif
413    pthread_exit(NULL);
414  }
415 
416  /**
417   * Invoke the callback in case there is a [callback] section containing a url parameter
418   *
419   * @param m the maps containing the main configuration file definitions
420   * @param inputs the inputs defined in the request (can be null if not yet initialized)
421   * @param inputs the outputs provided in the request (can be null if not yet initialized)
422   * @param step the step number, steps are defined as:
423   *  0: Analyze creation
424   *  1: Fetching Data Inputs
425   *  2: Uploading data inputs to cluster
426   *  3: Creating Job Script
427   *  4: Submitting Job to Cluster
428   *  5: Downloading processed output from cluster
429   *  6: Finalize
430   *  7: Dismiss or Error
431   * @param state 0 in case the step starts, 1 when it ends
432   * @return bool true in case of success, false in other cases
433   */
434  bool invokeCallback(maps* conf,maps* inputs,maps* outputs,int step,int state){
435    map* url=getMapFromMaps(conf,"callback","url");
436    if(url==NULL)
437      return false;
438     
439    maps* lenv=getMaps(conf,"lenv");
440    map* sname=getMap(lenv->content,"identifier");
441    if(sname!=NULL && isProhibited(conf,sname->value))
442      return false;
443     
444    json_object *res=json_object_new_object();
445
446    map* sid=getMapFromMaps(conf,"lenv","usid");
447    if(sid!=NULL){
448      json_object *jsStr=json_object_new_string(sid->value);
449      json_object_object_add(res,"jobid",jsStr);
450    }
451    const struct tm *tm;
452    size_t len;
453    time_t now;
454    char *tmp1;
455    map *tmpStatus;
456 
457    now = time ( NULL );
458    tm = localtime ( &now );
459
460    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
461    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%H:%M:%SZ", tm );
462    json_object *jsStr0=json_object_new_string(tmp1);
463    json_object_object_add(res,"datetime",jsStr0);
464    free(tmp1);
465   
466    switch(step){
467    case 0: {
468      // Create a new analyze
469      maps* lenv=getMaps(conf,"lenv");
470      sid=getMapFromMaps(conf,"renv","xrequest");
471      if(sid!=NULL){
472        json_object *jsStr=json_object_new_string(sid->value);
473        json_object_object_add(res,"request_execute_content",jsStr);
474      }
475      sid=getMapFromMaps(conf,"lenv","identifier");
476      if(sid!=NULL){
477        json_object *jsStr=json_object_new_string(sid->value);
478        json_object_object_add(res,"process_identifier",jsStr);
479      }
480      // Save the Execute request on disk
481      map* tmpPath=getMapFromMaps(conf,"main","tmpPath");
482      map* req=getMapFromMaps(conf,"renv","xrequest");
483      sid=getMapFromMaps(conf,"lenv","usid");
484      char* executePath=(char*)malloc((strlen(tmpPath->value)+strlen(sid->value)+14)*sizeof(char));
485      sprintf(executePath,"%s/execute_%s.xml",tmpPath->value,sid->value);
486      FILE* saveExecute=fopen(executePath,"wb");
487      fwrite(req->value,1,strlen(req->value)*sizeof(char),saveExecute);
488      fflush(saveExecute);
489      fclose(saveExecute);
490      setMapInMaps(conf,"lenv","execute_file",executePath);
491      free(executePath);
492      break;
493    }
494     
495    case 1: {
496      // Update the execute request stored on disk at step 0,0 to modify the references used.
497      if(state==1){
498        maps* curs=inputs;
499        xmlInitParser();
500        map* xmlPath=getMapFromMaps(conf,"lenv","execute_file");
501        while(curs!=NULL){
502          map* length=getMap(curs->content,"length");
503          map* useMS=getMap(curs->content,"useMapserver");
504          if(length==NULL){
505            addToMap(curs->content,"length","1");
506            length=getMap(curs->content,"length");
507          }
508          int len=atoi(length->value);
509          for(int ii=0;ii<len;ii++){
510            if(getMapArray(curs->content,"byValue",ii)!=NULL && getMapArray(curs->content,"mimeType",ii)!=NULL && useMS!=NULL && strncasecmp(useMS->value,"true",4)==0){
511              map* tmpMap=getMapArray(curs->content,"value",ii);
512              char tmpStr[100];
513              sprintf(tmpStr,"%ld",strlen(tmpMap->value));
514              setMapArray(curs->content,"size",ii,tmpStr);
515              tmpMap=getMapArray(curs->content,"mimeType",ii);
516              setMapArray(curs->content,"fmimeType",ii,tmpMap->value);
517              tmpMap=getMapArray(curs->content,"cache_file",ii);
518              setMapArray(curs->content,"generated_file",ii,tmpMap->value);
519              setMapArray(curs->content,"storage",ii,tmpMap->value);
520              setReferenceUrl(conf,curs);
521              addIntToMap(curs->content,"published_id",ii+1);
522              const char *params[7];
523              int xmlLoadExtDtdDefaultValue;
524              int hasFile=-1;
525              map* xslPath=getMapFromMaps(conf,"callback","template");
526              map* filePath=getMapArray(curs->content,"ref_wfs_link",ii);
527              if(filePath==NULL)
528                filePath=getMap(curs->content,"ref_wcs_link");
529              char* inputName=curs->name;
530              if(xslPath==NULL || xmlPath==NULL || filePath==NULL)
531                break;
532              char *tmpParam=(char*)malloc((strlen(curs->name)+11)*sizeof(char));
533              char *tmpParam1=(char*)malloc((strlen(filePath->value)+11)*sizeof(char));
534              char tmpParam2[16];
535              sprintf(tmpParam2,"string(\"%d\")",ii);
536              setMapArray(curs->content,"href",ii,filePath->value);
537              setMapArray(curs->content,"xlink:href",ii,filePath->value);
538              tmpMap=getMapArray(curs->content,"cache_url",ii);
539              if(tmpMap!=NULL)
540                setMapArray(curs->content,"xlink:href",ii,tmpMap->value);
541              else
542                setMapArray(curs->content,"xlink:href",ii,filePath->value);
543              sprintf(tmpParam,"string(\"%s\")",curs->name);
544              sprintf(tmpParam1,"string(\"%s\")",filePath->value);
545              sprintf(tmpParam2,"string(\"%d\")",ii);
546              params[0]="attr";
547              params[1]=tmpParam;
548              params[2]="value";
549              params[3]=tmpParam1;//filePath->value;
550              params[4]="cnt";
551              params[5]=tmpParam2;
552              params[6]=NULL;
553              fprintf(stderr, "## XSLT PARAMETERS ATTR: %s VALUE: %s INDEX: %s\n",
554                      tmpParam,tmpParam1,tmpParam2);
555              fflush(stderr);
556              xmlSubstituteEntitiesDefault(1);
557              xmlLoadExtDtdDefaultValue = 0;
558              xsltStylesheetPtr cur = NULL;
559              xmlDocPtr doc, res;
560              cur = xsltParseStylesheetFile(BAD_CAST xslPath->value);
561              doc = xmlParseFile(xmlPath->value);
562              fflush(stderr);
563              res = xsltApplyStylesheet(cur, doc, params);
564              xmlChar *xmlbuff;
565              int buffersize;
566              xmlDocDumpFormatMemory(res, &xmlbuff, &buffersize, 1);
567              // Store the executeRequest in file again
568              free(tmpParam);
569              free(tmpParam1);
570              fprintf(stderr," # Request / XSLT: %s\n",xmlbuff);
571              fflush(stderr);
572              FILE* saveExecute=fopen(xmlPath->value,"wb");
573              if(saveExecute!=NULL){
574                fwrite(xmlbuff,1,buffersize,saveExecute);
575                fflush(saveExecute);
576                fclose(saveExecute);
577              }
578              xmlFree(xmlbuff);
579              xmlFreeDoc(doc);
580              xsltFreeStylesheet(cur);
581            }
582          }
583          addIntToMap(curs->content,"published_id",0);
584          curs=curs->next;
585        }
586        xmlCleanupParser();
587        FILE* f0=fopen(xmlPath->value,"rb");
588        if(f0!=NULL){
589          long flen;
590          char *fcontent;
591          fseek (f0, 0, SEEK_END);
592          flen = ftell (f0);
593          fseek (f0, 0, SEEK_SET);
594          fcontent = (char *) malloc ((flen + 1) * sizeof (char));
595          fread(fcontent,flen,1,f0);
596          fcontent[flen]=0;
597          fclose(f0);
598          map *schema=getMapFromMaps(conf,"database","schema");
599          map* sid=getMapFromMaps(conf,"lenv","usid");
600          char *req=(char*)malloc((flen+strlen(schema->value)+strlen(sid->value)+66)*sizeof(char));
601          sprintf(req,"UPDATE %s.services set request_execute_content=$$%s$$ WHERE uuid=$$%s$$",schema->value,fcontent,sid->value);
602#ifdef RELY_ON_DB
603          execSql(conf,1,req);
604#endif
605          free(fcontent);
606          free(req);
607        }
608      }
609
610      // Fetching data inputs
611      maps* curs=inputs;
612      dumpMaps(curs);
613      const char *keys[11][2]={
614        {
615          "xlink:href",
616          "ref_download_link"
617        },
618        {
619          "cache_file",
620          "cachefile"
621        },
622        {
623          "fmimeType",
624          "mimetype"
625        },
626        {
627          "size",
628          "size"
629        },
630        {
631          "ref_wms_link",
632          "ref_wms_link"
633        },
634        {
635          "ref_wfs_link",
636          "ref_wfs_link"
637        },
638        {
639          "ref_wcs_link",
640          "ref_wcs_link"
641        },
642        {
643          "ref_wcs_link",
644          "ref_wcs_link"
645        },
646        {
647          "ref_wcs_preview_link",
648          "ref_wcs_preview_link"
649        },
650        {
651          "geodatatype",
652          "datatype"
653        },
654        {
655          "wgs84_extent",
656          "boundingbox"
657        }       
658      };
659      json_object *res1=json_object_new_object();
660      while(curs!=NULL){
661        if(getMap(curs->content,"length")==NULL){
662          addToMap(curs->content,"length","1");
663        }
664        map* length=getMap(curs->content,"length");
665        int len=atoi(length->value);
666        json_object *res3;
667        int hasRef=-1;
668        for(int ii=0;ii<len;ii++){
669          map* tmpMap=getMapArray(curs->content,"cache_file",ii);
670          sid=getMapArray(curs->content,"ref_wms_link",ii);
671          json_object *res2=json_object_new_object();
672          if(tmpMap!=NULL){
673            if(sid==NULL){
674              setMapArray(curs->content,"generated_file",ii,tmpMap->value);
675              setMapArray(curs->content,"storage",ii,tmpMap->value);
676            }
677            struct stat buf;
678            char timeStr[ 100 ] = "";
679            if (stat(tmpMap->value, &buf)==0){
680              strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
681              json_object *jsStr=json_object_new_string(timeStr);
682              json_object_object_add(res2,"creation_date",jsStr);
683            }
684            tmpMap=getMapArray(curs->content,"fmimeType",ii);
685            if(tmpMap!=NULL){
686              setMapArray(curs->content,"mimeType",ii,tmpMap->value);
687            }
688            setReferenceUrl(conf,curs);
689          }else{         
690          }
691          addIntToMap(curs->content,"published_id",ii+1);
692          int i=0;
693          for(;i<11;i++){
694            sid=getMapArray(curs->content,keys[i][0],ii);
695            if(sid!=NULL){
696              json_object *jsStr=json_object_new_string(sid->value);
697              json_object_object_add(res2,keys[i][1],jsStr);
698              if(i==0){
699                hasRef=1;
700                json_object *jsStr1=json_object_new_string(getProvenance(conf,sid->value));
701                json_object_object_add(res2,"dataOrigin",jsStr1);
702              }
703            }
704          }
705          if(len>1){
706            if(ii==0)
707              res3=json_object_new_array();
708            json_object_array_add(res3,res2);
709          }else
710            res3=res2;
711        }
712        if(hasRef<0)
713          json_object_put(res3);
714        else{
715          json_object_object_add(res1,curs->name,json_object_get(res3));
716          json_object_put(res3);
717        }
718        addIntToMap(curs->content,"published_id",0);
719        curs=curs->next;
720      }
721      json_object_object_add(res,"inputs",res1);
722      break;
723    }
724     
725    case 2: {
726      // Uploading data input to cluster
727      maps* in=getMaps(conf,"uploadQueue");
728      if(in!=NULL){
729        maps* curs=in;
730        map* length=getMapFromMaps(in,"uploadQueue","length");
731        if(length!=NULL){
732          json_object *res1=json_object_new_object();
733          int limit=atoi(length->value);
734          int i=0;
735          maps* uploadQueue=getMaps(in,"uploadQueue");
736          map* tmp=uploadQueue->content;
737          for(;i<limit;i++){
738            map* tmp0=getMapArray(tmp,"input",i);
739            map* tmp1=getMapArray(tmp,"localPath",i);
740            map* tmp2=getMapArray(tmp,"targetPath",i);
741            if(tmp0!=NULL && tmp1!=NULL && tmp2!=NULL){
742              json_object *res2=json_object_new_object();
743              json_object *jsStr=json_object_new_string(tmp1->value);
744              json_object_object_add(res2,"local_path",jsStr);
745              jsStr=json_object_new_string(tmp2->value);
746              json_object_object_add(res2,"target_path",jsStr);
747              json_object *res4=NULL;
748              if(json_object_object_get_ex(res1,tmp0->value,&res4)!=FALSE){
749                if(json_object_is_type(res4,json_type_null)){
750                  json_object_object_add(res1,tmp0->value,res2);
751                }else{
752                  if(json_object_is_type(res4,json_type_object) && !json_object_is_type(res4, json_type_array)){
753                    json_object *res3=json_object_new_array();
754                    json_object_array_add(res3,json_object_get(res4));
755                    json_object_array_add(res3,res2);
756                    json_object_object_del(res1,tmp0->value);
757                    json_object_object_add(res1,tmp0->value,res3);
758                  }else
759                    json_object_array_add(res4,res2);
760                }
761              }
762            }
763          }
764          json_object_object_add(res,"inputs",res1);
765        }
766      }
767      break;
768    }
769     
770    case 3: {
771      // Generating job script
772      sid=getMapFromMaps(conf,"lenv","local_script");
773      if(sid!=NULL){
774        json_object *jsStr=json_object_new_string(sid->value);
775        json_object_object_add(res,"script",jsStr);
776      }
777      break;
778    }
779     
780    case 4: {
781      // Submitting job to cluster
782      sid=getMapFromMaps(conf,"lenv","remote_script");
783      if(sid!=NULL){
784        json_object *jsStr=json_object_new_string(sid->value);
785        json_object_object_add(res,"script",jsStr);
786      }
787      break;
788    }
789     
790    case 5: {
791      // Downloading process outputs from cluster
792      maps* curs=outputs;
793      dumpMaps(curs);
794      const char *keys[10][2]={
795        {
796          "Reference",
797          "ref"
798        },
799        {
800          "generated_file",
801          "cachefile"
802        },
803        {
804          "mimeType",
805          "mimetype"
806        },
807        {
808          "size",
809          "size"
810        },
811        {
812          "geodatatype",
813          "datatype"
814        },
815        {
816          "wgs84_extent",
817          "boundingbox"
818        },
819        {
820          "ref_wms_link",
821          "ref_wms_link"
822        },
823        {
824          "ref_wcs_link",
825          "ref_wcs_link"
826        },
827        {
828          "ref_wcs_preview_link",
829          "ref_wcs_preview_link"
830        },
831        {
832          "ref_wfs_link",
833          "ref_wfs_link"
834        }       
835      };
836      const char* specifics[5][2]={
837        {
838          "download_link",
839          "ref_download_link"
840        },
841        {
842          "wms_link",
843          "ref_wms_link"
844        },
845        {
846          "wfs_link",
847          "ref_wfs_link"
848        },
849        {
850          "wcs_link",
851          "ref_wcs_link"
852        },
853        {
854          "wcs_link",
855          "ref_wcs_preview_link"
856        }
857      };
858      json_object *res1=json_object_new_object();
859      while(curs!=NULL){       
860        json_object *res2=json_object_new_object();
861        int i=0;
862        int hasRef=-1;
863        for(;i<10;i++){
864          sid=getMap(curs->content,keys[i][0]);
865          if(sid!=NULL){
866            json_object *jsStr=json_object_new_string(sid->value);
867            json_object_object_add(res2,keys[i][1],jsStr);
868            if(i==0){
869              hasRef=1;
870              json_object_object_add(res2,"ref_download_link",jsStr);
871            }
872            if(i==1){
873              struct stat buf;
874              char timeStr[ 100 ] = "";
875              if (stat(sid->value, &buf)==0){
876                strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
877                json_object *jsStr=json_object_new_string(timeStr);
878                json_object_object_add(res2,"creation_date",jsStr);
879              }
880            }
881          }
882        }
883        if(hasRef>0)
884          json_object_object_add(res1,curs->name,res2);
885        else{
886          maps* curs0=curs->child;
887          int i=0;
888          int bypass=-1;
889          for(i=0;i<5;i++){
890            maps* specificMaps;
891            if((specificMaps=getMaps(curs0,specifics[i][0]))!=NULL){
892              int hasRef0=-1;
893              int i0=0;
894              for(;i0<6;i0++){
895                sid=getMap(specificMaps->content,keys[i0][0]);
896                if(sid!=NULL){
897                  json_object *jsStr=json_object_new_string(sid->value);
898                  if(i0==0){
899                    json_object_object_add(res2,specifics[i][1],jsStr);
900                  }
901                  else
902                    json_object_object_add(res2,keys[i0][1],jsStr);
903                  hasRef0=1;
904                  bypass=1;
905                  if(i==1){
906                    struct stat buf;
907                    char timeStr[ 100 ] = "";
908                    if (stat(sid->value, &buf)==0){
909                      strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
910                      json_object *jsStr=json_object_new_string(timeStr);
911                      json_object_object_add(res2,"creation_date",jsStr);
912                    }
913                  }
914                }
915              }       
916            }
917          }
918          if(bypass<0)
919            while(curs0!=NULL){
920              json_object *res3=json_object_new_object();
921              int i0=0;
922              int hasRef0=-1;
923              for(;i0<10;i0++){
924                sid=getMap(curs0->content,keys[i0][0]);
925                if(sid!=NULL){
926                  json_object *jsStr=json_object_new_string(sid->value);
927                  json_object_object_add(res3,keys[i0][1],jsStr);
928                  hasRef0=1;
929                }
930              }
931              if(hasRef0<0)
932                json_object_put(res3);
933              else
934                json_object_object_add(res2,curs0->name,res3);
935              curs0=curs0->next;
936            }
937          json_object_object_add(res1,curs->name,res2);
938        }
939        curs=curs->next;
940      }
941      json_object_object_add(res,"outputs",res1);
942      break;
943    }
944     
945    case 6: {
946      // Finalize HPC
947      const char *keys[6][2]={
948        {
949          //"SubmitTime",
950          "Submit",
951          "hpc_submission_date"
952        },
953        {
954          "JobId",
955          "hpc_job_identifier"
956        },
957        {
958          "JobName",
959          "hpc_job_name"
960        },
961        {
962          //"StartTime",
963          "Start",
964          "hpc_start_date"
965        },
966        {
967          //"EndTime",
968          "End",
969          "hpc_end_date"
970        },
971        {
972          //"JobState",
973          "State",
974          "hpc_status"
975        }       
976      };
977      int i=0;
978      if(getMaps(conf,"henv")!=NULL){
979        for(i=0;i<6;i++){
980          sid=getMapFromMaps(conf,"henv",keys[i][0]);
981          if(sid!=NULL){
982            json_object *jsStr=json_object_new_string(sid->value);
983            json_object_object_add(res,keys[i][1],jsStr);
984          }
985        }
986      }
987      if((sid=getMapFromMaps(conf,"henv","billing_nb_cpu"))!=NULL){
988        json_object *jsStr=json_object_new_string(sid->value);
989        json_object_object_add(res,"hpc_cpu_usage",jsStr);
990      }else{
991        json_object *jsStr=json_object_new_string("1");
992        json_object_object_add(res,"hpc_cpu_usage",jsStr);
993      }
994      json_object *jsStr=json_object_new_string("succeeded");
995      json_object_object_add(res,"wps_status",jsStr);
996      break;
997    }
998     
999    case 7: {
1000      // Error or Dismiss
1001      sid=getMapFromMaps(conf,"lenv","message");
1002      if(sid!=NULL){
1003        json_object *jsStr=json_object_new_string(sid->value);
1004        json_object_object_add(res,"message",jsStr);
1005      }
1006      json_object *jsStr;
1007      if(state==1)
1008        jsStr=json_object_new_string("dismissed");
1009      else
1010        jsStr=json_object_new_string("failed");
1011      json_object_object_add(res,"wps_status",jsStr);
1012      break;
1013    }
1014    others: {
1015        break;
1016      }
1017    }
1018
1019    if(local_arguments==NULL)
1020      local_arguments=(local_params**)malloc(sizeof(local_params*));
1021    else
1022      local_arguments=(local_params**)realloc(local_arguments,(nbThreads+1)*sizeof(local_params*));
1023    local_arguments[nbThreads]=(local_params*)malloc(MAPS_SIZE+MAP_SIZE+sizeof(json_object*)+(2*sizeof(int))); 
1024    local_arguments[nbThreads]->conf=conf;
1025    local_arguments[nbThreads]->url=url;
1026    local_arguments[nbThreads]->res=res;
1027    local_arguments[nbThreads]->step=step;
1028    local_arguments[nbThreads]->state=state;
1029    if(myThreads==NULL)
1030      myThreads=(pthread_t*)malloc((nbThreads+1)*sizeof(pthread_t));
1031    else
1032      myThreads=(pthread_t*)realloc(myThreads,(nbThreads+1)*sizeof(pthread_t));
1033    if(pthread_create(&myThreads[nbThreads], NULL, _invokeCallback, (void*)local_arguments[nbThreads])==-1){
1034      setMapInMaps(conf,"lenv","message",_("Unable to create a new thread"));
1035      return false;
1036    }
1037    nbThreads++;
1038    return true;
1039  }
1040#endif
1041 
1042  /**
1043   * Wait for the threads to end then, clean used memory.
1044   */
1045  void cleanupCallbackThreads(){
1046    while( isOngoing>0 ){
1047      zSleep(100);
1048    }
1049    int i=0;
1050    for(i=0;i<nbThreads;i++){
1051      pthread_join(myThreads[i],NULL);
1052      free(local_arguments[i]);
1053    }
1054    free(local_arguments);
1055    free(myThreads);
1056  }
1057
1058#ifdef __cplusplus
1059}
1060#endif
Note: See TracBrowser for help on using the repository browser.

Search

Context Navigation

ZOO Sponsors

http://www.zoo-project.org/trac/chrome/site/img/geolabs-logo.pnghttp://www.zoo-project.org/trac/chrome/site/img/neogeo-logo.png http://www.zoo-project.org/trac/chrome/site/img/apptech-logo.png http://www.zoo-project.org/trac/chrome/site/img/3liz-logo.png http://www.zoo-project.org/trac/chrome/site/img/gateway-logo.png

Become a sponsor !

Knowledge partners

http://www.zoo-project.org/trac/chrome/site/img/ocu-logo.png http://www.zoo-project.org/trac/chrome/site/img/gucas-logo.png http://www.zoo-project.org/trac/chrome/site/img/polimi-logo.png http://www.zoo-project.org/trac/chrome/site/img/fem-logo.png http://www.zoo-project.org/trac/chrome/site/img/supsi-logo.png http://www.zoo-project.org/trac/chrome/site/img/cumtb-logo.png

Become a knowledge partner

Related links

http://zoo-project.org/img/ogclogo.png http://zoo-project.org/img/osgeologo.png