source: branches/prototype-v0/zoo-project/zoo-kernel/service_callback.c @ 863

Last change on this file since 863 was 863, checked in by djay, 7 years ago

Change the default ZOO-Kernel behavior, if an input has been passed by reference, the ZOO-Service will receive a cache_file map rather than the value field which was usually returned, same for array value apply. To use the previous behavior, one can add "memory=load" to the main section of the main.cfg file. Update ZOO-Services for using this new field if available.

  • Property svn:keywords set to Id
File size: 25.3 KB
Line 
1/*
2 * Author : Gérald FENOY
3 *
4 *  Copyright 2017 GeoLabs SARL. All rights reserved.
5 *
6 * This work was supported by public funds received in the framework of GEOSUD,
7 * a project (ANR-10-EQPX-20) of the program "Investissements d'Avenir" managed
8 * by the French National Research Agency
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 */
28
29#include "service_callback.h"
30#include "service_json.h"
31#include "service_internal_ms.h"
32#include "sqlapi.h"
33#include <pthread.h>
34#include <libxml/tree.h>
35#include <libxml/parser.h>
36#include <libxml/xpath.h>
37#include <libxml/xpathInternals.h>
38
39#include <libxslt/xslt.h>
40#include <libxslt/xsltInternals.h>
41#include <libxslt/transform.h>
42#include <libxslt/xsltutils.h>
43
44#ifdef __cplusplus
45extern "C" {
46#endif
47
48  /**
49   * Parameter definition to be used for sending parameters to a thread.
50   */
51  typedef struct {
52    maps *conf;      //!< the main configuration file
53    map *url;        //!< the callback url maps
54    json_object *res;//!< the JSON object to post
55    int step;        //!< the current step [0,6]
56    int state;       //!< the current state [0,1]
57  } local_params;
58
59  /**
60   * Number of threads
61   */
62  int nbThreads=0;
63  /**
64   * Current step
65   */
66  int cStep=0;
67  /**
68   * Is there any ongoing HTTP request
69   */
70  int isOngoing=0;
71  /**
72   * Threads array
73   */
74  pthread_t* myThreads=NULL;
75  /**
76   * Steps array
77   */
78  bool steps[7][2]={
79    {false,false},
80    {false,false},
81    {false,false},
82    {false,false},
83    {false,false},
84    {false,false},
85    {false,false}
86  };
87  /**
88   * Arguments array to give to the _invokeCallback thread's function
89   */
90  local_params** local_arguments;
91 
92  /**
93   * Check if a service name is prohibited, meaning that the Kernel doesn't have
94   * to invoke the callback for this specific service.
95   *
96   * @param conf the main configuration file maps
97   * @param serviceName the serviceName
98   * @return a bool true if the service is prohibited, false in other case
99   */
100  bool isProhibited(maps* conf,const char* serviceName){
101    map* plist=getMapFromMaps(conf,"callback","prohibited");
102    if(plist!=NULL){
103      char *tmp=plist->value;
104      char *tmpS=strtok(tmp,",");
105      while(tmpS!=NULL){
106        if(strcmp(serviceName,tmpS)==0)
107          return true;
108        tmpS=strtok(NULL,",");
109      }
110    }
111    return false;
112  }
113
114  /**
115   * Practically invoke the callback, meaning sending the HTTP POST request.
116   *
117   * @param args local_params containing all the variables required
118   */
119  void* _invokeCallback(void* args){
120    local_params* arg=(local_params*)args;
121    HINTERNET hInternet,res1;
122    const struct tm *tm;
123    size_t len;
124    time_t now;
125    char *tmp1;
126    map *tmpStatus;
127    maps* tmpConf=createMaps("main");
128    tmpConf->content=createMap("memory","load");
129    hInternet=InternetOpen("ZooWPSClient\0",
130                           INTERNET_OPEN_TYPE_PRECONFIG,
131                           NULL,NULL, 0);
132    if(!CHECK_INET_HANDLE(hInternet)){
133      InternetCloseHandle (&hInternet);
134      return false;
135    }
136    char *URL=(char*)malloc((strlen(arg->url->value)+5)*sizeof(char));
137    sprintf(URL,"%s%d_%d/",arg->url->value,arg->step,arg->state);
138    const char* jsonStr=json_object_to_json_string_ext(arg->res,JSON_C_TO_STRING_PLAIN);
139    hInternet.waitingRequests[0] = zStrdup(URL);
140    free(URL);
141#ifdef CALLBACK_DEBUG
142    now = time ( NULL );
143    tm = localtime ( &now );
144    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
145    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
146    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS cStep %d %d\n",pthread_self(),__FILE__,__LINE__,cStep,isOngoing);
147    fprintf(stderr," * JSON: [%s] \n",jsonStr);
148    fprintf(stderr," * URL: %s/ \n\n",hInternet.waitingRequests[0]);
149    fprintf(stderr," * DATE: %s/ \n\n",tmp1);
150    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS\n",pthread_self(),__FILE__,__LINE__);
151    free(tmp1);
152#endif
153    while( (arg->step!=7 || isOngoing>0) &&
154           ( cStep!=arg->step || (arg->state!=0 && steps[arg->step][0]==false) )
155           ){
156      zSleep(100);
157    }
158    isOngoing=1;
159#ifdef CALLBACK_DEBUG
160    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START\n\n",pthread_self(),__FILE__,__LINE__);
161    int i=0;
162    for(i=0;i<7;i++){
163      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
164    }
165#endif
166   
167    now = time ( NULL );
168    tm = localtime ( &now );
169   
170    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
171    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
172
173#ifdef CALLBACK_DEBUG   
174    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START (%s)\n",pthread_self(),__FILE__,__LINE__,tmp1);
175    fflush(stderr);
176#endif   
177    free(tmp1);
178    res1 = InternetOpenUrl (&hInternet,
179                            hInternet.waitingRequests[0], 
180                            (char*)jsonStr, strlen(jsonStr),
181                            INTERNET_FLAG_NO_CACHE_WRITE,
182                            0,tmpConf);
183    AddHeaderEntries(&hInternet,arg->conf);
184    //curl_easy_setopt(hInternet.ihandle[hInternet.nb].handle, CURLOPT_VERBOSE, 1);x
185    processDownloads(&hInternet);
186    freeMaps(&tmpConf);
187    free(tmpConf);
188    now = time ( NULL );
189    tm = localtime ( &now );
190    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
191    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
192   
193#ifdef CALLBACK_DEBUG   
194    fprintf(stderr,"************************* From thread %d %s %d: REQUEST END (%s)\n\n",pthread_self(),__FILE__,__LINE__,tmp1);
195#endif
196    free(tmp1);
197    char *tmp = (char *) malloc ((hInternet.ihandle[0].nDataLen + 1)
198                                 * sizeof (char));
199    if (tmp == NULL)
200      {
201        setMapInMaps(arg->conf,"lenv","message",_("Unable to allocate memory"));
202        setMapInMaps(arg->conf,"lenv","code","InternalError");
203        return NULL;
204      }
205    size_t bRead;
206    InternetReadFile (hInternet.ihandle[0],
207                      (LPVOID) tmp,
208                      hInternet.
209                      ihandle[0].nDataLen,
210                      &bRead);
211    tmp[hInternet.ihandle[0].nDataLen] = 0;
212    json_object_put(arg->res);
213    InternetCloseHandle(&hInternet);
214    isOngoing=0;
215    if(cStep==0 || cStep==6 || arg->state==1)
216      cStep=arg->step+1;
217#ifdef CALLBACK_DEBUG
218    now = time ( NULL );
219    tm = localtime ( &now );
220    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
221    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
222    fprintf(stderr,"************************* From thread %d %s %d: RESPONSE CONTENT (%s)\n",pthread_self(),__FILE__,__LINE__,,tmp1);
223    for(i=0;i<7;i++){
224      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
225    }
226    fprintf(stderr,"Result: \n%s\n\n",tmp);
227    fprintf(stderr,"************************* From thread %d %s %d\n\n",pthread_self(),__FILE__,__LINE__);
228    fflush(stderr);
229    free(tmp1);
230#endif
231    steps[arg->step][arg->state]=true;
232    free(tmp);
233#ifdef CALLBACK_DEBUG
234    fprintf(stderr,"************************* From thread %d %s %d: EXIT\n\n",pthread_self(),__FILE__,__LINE__);
235    fflush(stderr);
236#endif
237    pthread_exit(NULL);
238  }
239 
240  /**
241   * Invoke the callback in case there is a [callback] section containing a url parameter
242   *
243   * @param m the maps containing the main configuration file definitions
244   * @param inputs the inputs defined in the request (can be null if not yet initialized)
245   * @param inputs the outputs provided in the request (can be null if not yet initialized)
246   * @param step the step number, steps are defined as:
247   *  0: Analyze creation
248   *  1: Fetching Data Inputs
249   *  2: Uploading data inputs to cluster
250   *  3: Creating Job Script
251   *  4: Submitting Job to Cluster
252   *  5: Downloading processed output from cluster
253   *  6: Finalize
254   *  7: Dismiss or Error
255   * @param state 0 in case the step starts, 1 when it ends
256   * @return bool true in case of success, false in other cases
257   */
258  bool invokeCallback(maps* conf,maps* inputs,maps* outputs,int step,int state){
259    map* url=getMapFromMaps(conf,"callback","url");
260    if(url==NULL)
261      return false;
262     
263    maps* lenv=getMaps(conf,"lenv");
264    map* sname=getMap(lenv->content,"identifier");
265    if(sname!=NULL && isProhibited(conf,sname->value))
266      return false;
267     
268    json_object *res=json_object_new_object();
269
270    map* sid=getMapFromMaps(conf,"lenv","usid");
271    if(sid!=NULL){
272      json_object *jsStr=json_object_new_string(sid->value);
273      json_object_object_add(res,"jobid",jsStr);
274    }
275    const struct tm *tm;
276    size_t len;
277    time_t now;
278    char *tmp1;
279    map *tmpStatus;
280 
281    now = time ( NULL );
282    tm = localtime ( &now );
283
284    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
285    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%H:%M:%SZ", tm );
286    json_object *jsStr0=json_object_new_string(tmp1);
287    json_object_object_add(res,"datetime",jsStr0);
288    free(tmp1);
289   
290    switch(step){
291    case 0: {
292      // Create a new analyze
293      maps* lenv=getMaps(conf,"lenv");
294      sid=getMapFromMaps(conf,"renv","xrequest");
295      if(sid!=NULL){
296        json_object *jsStr=json_object_new_string(sid->value);
297        json_object_object_add(res,"request_execute_content",jsStr);
298      }
299      sid=getMapFromMaps(conf,"lenv","identifier");
300      if(sid!=NULL){
301        json_object *jsStr=json_object_new_string(sid->value);
302        json_object_object_add(res,"process_identifier",jsStr);
303      }
304      // Save the Execute request on disk
305      map* tmpPath=getMapFromMaps(conf,"main","tmpPath");
306      map* req=getMapFromMaps(conf,"renv","xrequest");
307      sid=getMapFromMaps(conf,"lenv","usid");
308      char* executePath=(char*)malloc((strlen(tmpPath->value)+strlen(sid->value)+14)*sizeof(char));
309      sprintf(executePath,"%s/execute_%s.xml",tmpPath->value,sid->value);
310      FILE* saveExecute=fopen(executePath,"wb");
311      fwrite(req->value,1,strlen(req->value)*sizeof(char),saveExecute);
312      fflush(saveExecute);
313      fclose(saveExecute);
314      setMapInMaps(conf,"lenv","execute_file",executePath);
315      free(executePath);
316      break;
317    }
318     
319    case 1: {
320      // Update the execute request stored on disk at step 0,0 to modify the references used.
321      if(state==1){
322        maps* curs=inputs;
323        xmlInitParser();
324        map* xmlPath=getMapFromMaps(conf,"lenv","execute_file");
325        while(curs!=NULL){
326          map* length=getMap(curs->content,"length");
327          map* useMS=getMap(curs->content,"useMapserver");
328          if(length==NULL){
329            addToMap(curs->content,"length","1");
330            length=getMap(curs->content,"length");
331          }
332          int len=atoi(length->value);
333          for(int ii=0;ii<len;ii++){
334            if(getMapArray(curs->content,"byValue",ii)!=NULL && getMapArray(curs->content,"mimeType",ii)!=NULL && useMS!=NULL && strncasecmp(useMS->value,"true",4)==0){
335              map* tmpMap=getMapArray(curs->content,"value",ii);
336              char tmpStr[100];
337              sprintf(tmpStr,"%d",strlen(tmpMap->value));
338              setMapArray(curs->content,"size",ii,tmpStr);
339              tmpMap=getMapArray(curs->content,"mimeType",ii);
340              setMapArray(curs->content,"fmimeType",ii,tmpMap->value);
341              tmpMap=getMapArray(curs->content,"cache_file",ii);
342              setMapArray(curs->content,"generated_file",ii,tmpMap->value);
343              setMapArray(curs->content,"storage",ii,tmpMap->value);
344              setReferenceUrl(conf,curs);
345              addIntToMap(curs->content,"published_id",ii+1);
346              const char *params[7];
347              int xmlLoadExtDtdDefaultValue;
348              int hasFile=-1;
349              map* xslPath=getMapFromMaps(conf,"callback","template");
350              map* filePath=getMapArray(curs->content,"ref_wfs_link",ii);
351              if(filePath==NULL)
352                filePath=getMap(curs->content,"ref_wcs_link");
353              char* inputName=curs->name;
354              if(xslPath==NULL || xmlPath==NULL || filePath==NULL)
355                break;
356              char *tmpParam=(char*)malloc((strlen(curs->name)+11)*sizeof(char));
357              char *tmpParam1=(char*)malloc((strlen(filePath->value)+11)*sizeof(char));
358              char tmpParam2[16];
359              sprintf(tmpParam2,"string(\"%d\")",ii);
360              setMapArray(curs->content,"href",ii,filePath->value);
361              setMapArray(curs->content,"xlink:href",ii,filePath->value);
362              tmpMap=getMapArray(curs->content,"cache_url",ii);
363              if(tmpMap!=NULL)
364                setMapArray(curs->content,"xlink:href",ii,tmpMap->value);
365              else
366                setMapArray(curs->content,"xlink:href",ii,filePath->value);
367              sprintf(tmpParam,"string(\"%s\")",curs->name);
368              sprintf(tmpParam1,"string(\"%s\")",filePath->value);
369              sprintf(tmpParam2,"string(\"%d\")",ii);
370              params[0]="attr";
371              params[1]=tmpParam;
372              params[2]="value";
373              params[3]=tmpParam1;//filePath->value;
374              params[4]="cnt";
375              params[5]=tmpParam2;
376              params[6]=NULL;
377              fprintf(stderr, "## XSLT PARAMETERS ATTR: %s VALUE: %s INDEX: %s\n",
378                      tmpParam,tmpParam1,tmpParam2);
379              fflush(stderr);
380              xmlSubstituteEntitiesDefault(1);
381              xmlLoadExtDtdDefaultValue = 0;
382              xsltStylesheetPtr cur = NULL;
383              xmlDocPtr doc, res;
384              cur = xsltParseStylesheetFile(BAD_CAST xslPath->value);
385              doc = xmlParseFile(xmlPath->value);
386              fflush(stderr);
387              res = xsltApplyStylesheet(cur, doc, params);
388              xmlChar *xmlbuff;
389              int buffersize;
390              xmlDocDumpFormatMemory(res, &xmlbuff, &buffersize, 1);
391              // Store the executeRequest in file again
392              free(tmpParam);
393              free(tmpParam1);
394              fprintf(stderr," # Request / XSLT: %s\n",xmlbuff);
395              fflush(stderr);
396              FILE* saveExecute=fopen(xmlPath->value,"wb");
397              if(saveExecute!=NULL){
398                fwrite(xmlbuff,1,buffersize,saveExecute);
399                fflush(saveExecute);
400                fclose(saveExecute);
401              }
402              xmlFree(xmlbuff);
403              xmlFreeDoc(doc);
404              xsltFreeStylesheet(cur);
405            }
406          }
407          addIntToMap(curs->content,"published_id",0);
408          curs=curs->next;
409        }
410        xmlCleanupParser();
411        FILE* f0=fopen(xmlPath->value,"rb");
412        if(f0!=NULL){
413          long flen;
414          char *fcontent;
415          fseek (f0, 0, SEEK_END);
416          flen = ftell (f0);
417          fseek (f0, 0, SEEK_SET);
418          fcontent = (char *) malloc ((flen + 1) * sizeof (char));
419          fread(fcontent,flen,1,f0);
420          fcontent[flen]=0;
421          fclose(f0);
422          map *schema=getMapFromMaps(conf,"database","schema");
423          map* sid=getMapFromMaps(conf,"lenv","usid");
424          char *req=(char*)malloc((flen+strlen(schema->value)+strlen(sid->value)+66)*sizeof(char));
425          sprintf(req,"UPDATE %s.services set request_execute_content=$$%s$$ WHERE uuid=$$%s$$",schema->value,fcontent,sid->value);
426          execSql(conf,1,req);
427          free(fcontent);
428          free(req);
429        }
430      }
431
432      // Fetching data inputs
433      maps* curs=inputs;
434      dumpMaps(curs);
435      char *keys[11][2]={
436        {
437          "xlink:href",
438          "ref_download_link"
439        },
440        {
441          "cache_file",
442          "cachefile"
443        },
444        {
445          "fmimeType",
446          "mimetype"
447        },
448        {
449          "size",
450          "size"
451        },
452        {
453          "ref_wms_link",
454          "ref_wms_link"
455        },
456        {
457          "ref_wfs_link",
458          "ref_wfs_link"
459        },
460        {
461          "ref_wcs_link",
462          "ref_wcs_link"
463        },
464        {
465          "ref_wcs_link",
466          "ref_wcs_link"
467        },
468        {
469          "ref_wcs_preview_link",
470          "ref_wcs_preview_link"
471        },
472        {
473          "geodatatype",
474          "datatype"
475        },
476        {
477          "wgs84_extent",
478          "boundingbox"
479        }       
480      };
481      json_object *res1=json_object_new_object();
482      while(curs!=NULL){
483        if(getMap(curs->content,"length")==NULL){
484          addToMap(curs->content,"length","1");
485        }
486        map* length=getMap(curs->content,"length");
487        int len=atoi(length->value);
488        json_object *res3;
489        int hasRef=-1;
490        for(int ii=0;ii<len;ii++){
491          map* tmpMap=getMapArray(curs->content,"cache_file",ii);
492          sid=getMapArray(curs->content,"ref_wms_link",ii);
493          json_object *res2=json_object_new_object();
494          if(tmpMap!=NULL){
495            if(sid==NULL){
496              setMapArray(curs->content,"generated_file",ii,tmpMap->value);
497              setMapArray(curs->content,"storage",ii,tmpMap->value);
498            }
499            struct stat buf;
500            char timeStr[ 100 ] = "";
501            if (stat(tmpMap->value, &buf)==0){
502              strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
503              json_object *jsStr=json_object_new_string(timeStr);
504              json_object_object_add(res2,"creation_date",jsStr);
505            }
506            tmpMap=getMapArray(curs->content,"fmimeType",ii);
507            if(tmpMap!=NULL){
508              setMapArray(curs->content,"mimeType",ii,tmpMap->value);
509            }
510            setReferenceUrl(conf,curs);
511          }else{         
512          }
513          addIntToMap(curs->content,"published_id",ii+1);
514          int i=0;
515          for(;i<11;i++){
516            sid=getMapArray(curs->content,keys[i][0],ii);
517            if(sid!=NULL){
518              json_object *jsStr=json_object_new_string(sid->value);
519              json_object_object_add(res2,keys[i][1],jsStr);
520              if(i==0){
521                hasRef=1;
522                json_object *jsStr1=json_object_new_string(getProvenance(conf,sid->value));
523                json_object_object_add(res2,"dataOrigin",jsStr1);
524              }
525            }
526          }
527          if(len>1){
528            if(ii==0)
529              res3=json_object_new_array();
530            json_object_array_add(res3,res2);
531          }else
532            res3=res2;
533        }
534        if(hasRef<0)
535          json_object_put(res3);
536        else{
537          json_object_object_add(res1,curs->name,json_object_get(res3));
538          json_object_put(res3);
539        }
540        addIntToMap(curs->content,"published_id",0);
541        curs=curs->next;
542      }
543      json_object_object_add(res,"inputs",res1);
544      break;
545    }
546     
547    case 2: {
548      // Uploading data input to cluster
549      maps* in=getMaps(conf,"uploadQueue");
550      if(in!=NULL){
551        maps* curs=in;
552        map* length=getMapFromMaps(in,"uploadQueue","length");
553        if(length!=NULL){
554          json_object *res1=json_object_new_object();
555          int limit=atoi(length->value);
556          int i=0;
557          maps* uploadQueue=getMaps(in,"uploadQueue");
558          map* tmp=uploadQueue->content;
559          for(;i<limit;i++){
560            map* tmp0=getMapArray(tmp,"input",i);
561            map* tmp1=getMapArray(tmp,"localPath",i);
562            map* tmp2=getMapArray(tmp,"targetPath",i);
563            if(tmp0!=NULL && tmp1!=NULL && tmp2!=NULL){
564              json_object *res2=json_object_new_object();
565              json_object *jsStr=json_object_new_string(tmp1->value);
566              json_object_object_add(res2,"local_path",jsStr);
567              jsStr=json_object_new_string(tmp2->value);
568              json_object_object_add(res2,"target_path",jsStr);
569              json_object *res4=json_object_object_get(res1,tmp0->value);
570              if(json_object_is_type(res4,json_type_null)){
571                json_object_object_add(res1,tmp0->value,res2);
572              }else{
573                if(json_object_is_type(res4,json_type_object) && !json_object_is_type(res4, json_type_array)){
574                  json_object *res3=json_object_new_array();
575                  json_object_array_add(res3,json_object_get(res4));
576                  json_object_array_add(res3,res2);
577                  json_object_object_del(res1,tmp0->value);
578                  json_object_object_add(res1,tmp0->value,res3);
579                }else
580                  json_object_array_add(res4,res2);
581              }
582            }
583          }
584          json_object_object_add(res,"inputs",res1);
585        }
586      }
587      break;
588    }
589     
590    case 3: {
591      // Generating job script
592      sid=getMapFromMaps(conf,"lenv","local_script");
593      if(sid!=NULL){
594        json_object *jsStr=json_object_new_string(sid->value);
595        json_object_object_add(res,"script",jsStr);
596      }
597      break;
598    }
599     
600    case 4: {
601      // Submitting job to cluster
602      sid=getMapFromMaps(conf,"lenv","remote_script");
603      if(sid!=NULL){
604        json_object *jsStr=json_object_new_string(sid->value);
605        json_object_object_add(res,"script",jsStr);
606      }
607      break;
608    }
609     
610    case 5: {
611      // Downloading process outputs from cluster
612      maps* curs=outputs;
613      dumpMaps(curs);
614      char *keys[10][2]={
615        {
616          "Reference",
617          "ref"
618        },
619        {
620          "generated_file",
621          "cachefile"
622        },
623        {
624          "mimeType",
625          "mimetype"
626        },
627        {
628          "size",
629          "size"
630        },
631        {
632          "geodatatype",
633          "datatype"
634        },
635        {
636          "wgs84_extent",
637          "boundingbox"
638        },
639        {
640          "ref_wms_link",
641          "ref_wms_link"
642        },
643        {
644          "ref_wcs_link",
645          "ref_wcs_link"
646        },
647        {
648          "ref_wcs_preview_link",
649          "ref_wcs_preview_link"
650        },
651        {
652          "ref_wfs_link",
653          "ref_wfs_link"
654        }       
655      };
656      char* specifics[5][2]={
657        {
658          "download_link",
659          "ref_download_link"
660        },
661        {
662          "wms_link",
663          "ref_wms_link"
664        },
665        {
666          "wfs_link",
667          "ref_wfs_link"
668        },
669        {
670          "wcs_link",
671          "ref_wcs_link"
672        },
673        {
674          "wcs_link",
675          "ref_wcs_preview_link"
676        }
677      };
678      json_object *res1=json_object_new_object();
679      while(curs!=NULL){       
680        json_object *res2=json_object_new_object();
681        int i=0;
682        int hasRef=-1;
683        for(;i<10;i++){
684          sid=getMap(curs->content,keys[i][0]);
685          if(sid!=NULL){
686            json_object *jsStr=json_object_new_string(sid->value);
687            json_object_object_add(res2,keys[i][1],jsStr);
688            if(i==0){
689              hasRef=1;
690              json_object_object_add(res2,"ref_download_link",jsStr);
691            }
692            if(i==1){
693              struct stat buf;
694              char timeStr[ 100 ] = "";
695              if (stat(sid->value, &buf)==0){
696                strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
697                json_object *jsStr=json_object_new_string(timeStr);
698                json_object_object_add(res2,"creation_date",jsStr);
699              }
700            }
701          }
702        }
703        if(hasRef>0)
704          json_object_object_add(res1,curs->name,res2);
705        else{
706          maps* curs0=curs->child;
707          int i=0;
708          int bypass=-1;
709          for(i=0;i<5;i++){
710            maps* specificMaps;
711            if((specificMaps=getMaps(curs0,specifics[i][0]))!=NULL){
712              int hasRef0=-1;
713              int i0=0;
714              for(;i0<6;i0++){
715                sid=getMap(specificMaps->content,keys[i0][0]);
716                if(sid!=NULL){
717                  json_object *jsStr=json_object_new_string(sid->value);
718                  if(i0==0){
719                    json_object_object_add(res2,specifics[i][1],jsStr);
720                  }
721                  else
722                    json_object_object_add(res2,keys[i0][1],jsStr);
723                  hasRef0=1;
724                  bypass=1;
725                  if(i==1){
726                    struct stat buf;
727                    char timeStr[ 100 ] = "";
728                    if (stat(sid->value, &buf)==0){
729                      strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
730                      json_object *jsStr=json_object_new_string(timeStr);
731                      json_object_object_add(res2,"creation_date",jsStr);
732                    }
733                  }
734                }
735              }       
736            }
737          }
738          if(bypass<0)
739            while(curs0!=NULL){
740              json_object *res3=json_object_new_object();
741              int i0=0;
742              int hasRef0=-1;
743              for(;i0<10;i0++){
744                sid=getMap(curs0->content,keys[i0][0]);
745                if(sid!=NULL){
746                  json_object *jsStr=json_object_new_string(sid->value);
747                  json_object_object_add(res3,keys[i0][1],jsStr);
748                  hasRef0=1;
749                }
750              }
751              if(hasRef0<0)
752                json_object_put(res3);
753              else
754                json_object_object_add(res2,curs0->name,res3);
755              curs0=curs0->next;
756            }
757          json_object_object_add(res1,curs->name,res2);
758        }
759        curs=curs->next;
760      }
761      json_object_object_add(res,"outputs",res1);
762      break;
763    }
764     
765    case 6: {
766      // Finalize HPC
767      char *keys[6][2]={
768        {
769          "SubmitTime",
770          "hpc_submission_date"
771        },
772        {
773          "JobId",
774          "hpc_job_identifier"
775        },
776        {
777          "JobName",
778          "hpc_job_name"
779        },
780        {
781          "StartTime",
782          "hpc_start_date"
783        },
784        {
785          "EndTime",
786          "hpc_end_date"
787        },
788        {
789          "JobState",
790          "hpc_status"
791        }       
792      };
793      int i=0;
794      if(getMaps(conf,"henv")!=NULL){
795        for(i=0;i<6;i++){
796          sid=getMapFromMaps(conf,"henv",keys[i][0]);
797          if(sid!=NULL){
798            json_object *jsStr=json_object_new_string(sid->value);
799            json_object_object_add(res,keys[i][1],jsStr);
800          }
801        }
802      }
803      if((sid=getMapFromMaps(conf,"henv","billing_nb_cpu"))!=NULL){
804        json_object *jsStr=json_object_new_string(sid->value);
805        json_object_object_add(res,"hpc_cpu_usage",jsStr);
806      }else{
807        json_object *jsStr=json_object_new_string("1");
808        json_object_object_add(res,"hpc_cpu_usage",jsStr);
809      }
810      json_object *jsStr=json_object_new_string("succeeded");
811      json_object_object_add(res,"wps_status",jsStr);
812      break;
813    }
814     
815    case 7: {
816      // Error or Dismiss
817      sid=getMapFromMaps(conf,"lenv","message");
818      if(sid!=NULL){
819        json_object *jsStr=json_object_new_string(sid->value);
820        json_object_object_add(res,"message",jsStr);
821      }
822      json_object *jsStr;
823      if(state==1)
824        jsStr=json_object_new_string("dismissed");
825      else
826        jsStr=json_object_new_string("failed");
827      json_object_object_add(res,"wps_status",jsStr);
828      break;
829    }
830    others: {
831        break;
832      }
833    }
834
835    if(local_arguments==NULL)
836      local_arguments=(local_params**)malloc(sizeof(local_params*));
837    else
838      local_arguments=(local_params**)realloc(local_arguments,(nbThreads+1)*sizeof(local_params*));
839    local_arguments[nbThreads]=(local_params*)malloc(MAPS_SIZE+MAP_SIZE+sizeof(json_object*)+(2*sizeof(int))); 
840    local_arguments[nbThreads]->conf=conf;
841    local_arguments[nbThreads]->url=url;
842    local_arguments[nbThreads]->res=res;
843    local_arguments[nbThreads]->step=step;
844    local_arguments[nbThreads]->state=state;
845    if(myThreads==NULL)
846      myThreads=(pthread_t*)malloc((nbThreads+1)*sizeof(pthread_t));
847    else
848      myThreads=(pthread_t*)realloc(myThreads,(nbThreads+1)*sizeof(pthread_t));
849    if(pthread_create(&myThreads[nbThreads], NULL, _invokeCallback, (void*)local_arguments[nbThreads])==-1){
850      setMapInMaps(conf,"lenv","message",_("Unable to create a new thread"));
851      return false;
852    }
853    nbThreads++;
854    return true;
855  }
856
857  /**
858   * Wait for the threads to end then, clean used memory.
859   */
860  void cleanupCallbackThreads(){
861    int i=0;
862    for(i=0;i<nbThreads;i++){
863      pthread_join(myThreads[i],NULL);
864      free(local_arguments[i]);
865    }
866    free(local_arguments);
867    free(myThreads);
868  }
869
870#ifdef __cplusplus
871}
872#endif
Note: See TracBrowser for help on using the repository browser.

Search

ZOO Sponsors

http://www.zoo-project.org/trac/chrome/site/img/geolabs-logo.pnghttp://www.zoo-project.org/trac/chrome/site/img/neogeo-logo.png http://www.zoo-project.org/trac/chrome/site/img/apptech-logo.png http://www.zoo-project.org/trac/chrome/site/img/3liz-logo.png http://www.zoo-project.org/trac/chrome/site/img/gateway-logo.png

Become a sponsor !

Knowledge partners

http://www.zoo-project.org/trac/chrome/site/img/ocu-logo.png http://www.zoo-project.org/trac/chrome/site/img/gucas-logo.png http://www.zoo-project.org/trac/chrome/site/img/polimi-logo.png http://www.zoo-project.org/trac/chrome/site/img/fem-logo.png http://www.zoo-project.org/trac/chrome/site/img/supsi-logo.png http://www.zoo-project.org/trac/chrome/site/img/cumtb-logo.png

Become a knowledge partner

Related links

http://zoo-project.org/img/ogclogo.png http://zoo-project.org/img/osgeologo.png